crosvm: remove balloon stats request timeout

It was done to avoid deadlock when stats are requested before guest is
up. Implement a stub BalloonStats::NotReady replier until host is up so
that timeout is no longer necessary.

BUG=b:232289535
TEST=few tast crostini/arc tests

Change-Id: I6731b4ee9eaecdd65aebdd3f530f0932b0660c85
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/3652887
Reviewed-by: Daniel Verkamp <dverkamp@chromium.org>
Tested-by: kokoro <noreply+kokoro@google.com>
Commit-Queue: Anton Romanov <romanton@google.com>
Auto-Submit: Anton Romanov <romanton@google.com>
This commit is contained in:
Anton Romanov 2022-05-17 19:35:47 +00:00 committed by Chromeos LUCI
parent ace7edca0e
commit 56c0d02760
4 changed files with 100 additions and 25 deletions

View file

@ -55,4 +55,7 @@ pub enum BalloonTubeResult {
Adjusted { Adjusted {
num_bytes: u64, num_bytes: u64,
}, },
NotReady {
id: u64,
},
} }

View file

@ -7,14 +7,17 @@ use std::rc::Rc;
use std::sync::Arc; use std::sync::Arc;
use std::thread; use std::thread;
use futures::{channel::mpsc, pin_mut, StreamExt}; use futures::{
channel::{mpsc, oneshot},
pin_mut, StreamExt,
};
use remain::sorted; use remain::sorted;
use thiserror::Error as ThisError; use thiserror::Error as ThisError;
use balloon_control::{BalloonStats, BalloonTubeCommand, BalloonTubeResult}; use balloon_control::{BalloonStats, BalloonTubeCommand, BalloonTubeResult};
use base::{self, error, warn, AsRawDescriptor, Event, RawDescriptor, Tube}; use base::{self, error, trace, warn, AsRawDescriptor, Event, RawDescriptor, Tube};
use cros_async::{ use cros_async::{
block_on, select6, select7, sync::Mutex as AsyncMutex, AsyncTube, EventAsync, Executor, block_on, select2, select6, select7, sync::Mutex as AsyncMutex, AsyncTube, EventAsync, Executor,
}; };
use data_model::{DataInit, Le16, Le32, Le64}; use data_model::{DataInit, Le16, Le32, Le64};
use vm_memory::{GuestAddress, GuestMemory}; use vm_memory::{GuestAddress, GuestMemory};
@ -34,6 +37,9 @@ pub enum BalloonError {
/// Failed to create async message receiver. /// Failed to create async message receiver.
#[error("failed to create async message receiver: {0}")] #[error("failed to create async message receiver: {0}")]
CreatingMessageReceiver(base::TubeError), CreatingMessageReceiver(base::TubeError),
/// IO Error.
#[error("IO error: {0}")]
IoError(#[from] std::io::Error),
/// Failed to receive command message. /// Failed to receive command message.
#[error("failed to receive command message: {0}")] #[error("failed to receive command message: {0}")]
ReceivingCommand(base::TubeError), ReceivingCommand(base::TubeError),
@ -44,6 +50,7 @@ pub enum BalloonError {
#[error("failed to write config event: {0}")] #[error("failed to write config event: {0}")]
WritingConfigEvent(base::Error), WritingConfigEvent(base::Error),
} }
pub type Result<T> = std::result::Result<T, BalloonError>; pub type Result<T> = std::result::Result<T, BalloonError>;
// Balloon implements four virt IO queues: Inflate, Deflate, Stats, Event. // Balloon implements four virt IO queues: Inflate, Deflate, Stats, Event.
@ -300,6 +307,7 @@ async fn handle_stats_queue(
} }
Ok(d) => d.index, Ok(d) => d.index,
}; };
loop { loop {
// Wait for a request to read the stats. // Wait for a request to read the stats.
let id = match stats_rx.next().await { let id = match stats_rx.next().await {
@ -444,6 +452,30 @@ async fn handle_command_tube(
} }
} }
// Stub worker thread to reply with `NotReady`
fn run_stub_worker(signal: oneshot::Receiver<()>, command_tube: Tube) -> Tube {
trace!("spawning ballon_stats stub worker");
let ex = Executor::new().unwrap();
let command_tube = AsyncTube::new(&ex, command_tube).unwrap();
{
let stub_replier = async {
loop {
if let Ok(BalloonTubeCommand::Stats { id }) = command_tube.next().await {
trace!("Sending not ready response to balloon stats request");
let res = BalloonTubeResult::NotReady { id };
if let Err(e) = command_tube.send(res).await {
error!("failed to send stats result: {}", e);
}
}
}
};
pin_mut!(stub_replier);
let _ = ex.run_until(select2(signal, stub_replier));
}
command_tube.into()
}
// The main worker thread. Initialized the asynchronous worker tasks and passes them to the executor // The main worker thread. Initialized the asynchronous worker tasks and passes them to the executor
// to be processed. // to be processed.
fn run_worker( fn run_worker(
@ -556,13 +588,15 @@ fn run_worker(
/// Virtio device for memory balloon inflation/deflation. /// Virtio device for memory balloon inflation/deflation.
pub struct Balloon { pub struct Balloon {
command_tube: Tube, command_tube: Option<Tube>,
inflate_tube: Option<Tube>, inflate_tube: Option<Tube>,
state: Arc<AsyncMutex<BalloonState>>, state: Arc<AsyncMutex<BalloonState>>,
features: u64, features: u64,
acked_features: u64, acked_features: u64,
kill_evt: Option<Event>, kill_evt: Option<Event>,
worker_thread: Option<thread::JoinHandle<Option<Tube>>>, worker_thread: Option<thread::JoinHandle<Option<Tube>>>,
stub_worker_thread: Option<thread::JoinHandle<Tube>>,
stub_signal: Option<oneshot::Sender<()>>,
} }
/// Operation mode of the balloon. /// Operation mode of the balloon.
@ -598,7 +632,7 @@ impl Balloon {
}; };
Ok(Balloon { Ok(Balloon {
command_tube, command_tube: Some(command_tube),
inflate_tube, inflate_tube,
state: Arc::new(AsyncMutex::new(BalloonState { state: Arc::new(AsyncMutex::new(BalloonState {
num_pages: (init_balloon_size >> VIRTIO_BALLOON_PFN_SHIFT) as u32, num_pages: (init_balloon_size >> VIRTIO_BALLOON_PFN_SHIFT) as u32,
@ -607,8 +641,10 @@ impl Balloon {
})), })),
kill_evt: None, kill_evt: None,
worker_thread: None, worker_thread: None,
stub_worker_thread: None,
features, features,
acked_features: 0, acked_features: 0,
stub_signal: None,
}) })
} }
@ -640,11 +676,11 @@ impl Drop for Balloon {
impl VirtioDevice for Balloon { impl VirtioDevice for Balloon {
fn keep_rds(&self) -> Vec<RawDescriptor> { fn keep_rds(&self) -> Vec<RawDescriptor> {
let mut rds = vec![self.command_tube.as_raw_descriptor()]; self.command_tube
if let Some(inflate_tube) = &self.inflate_tube { .iter()
rds.push(inflate_tube.as_raw_descriptor()); .chain(self.inflate_tube.iter())
} .map(AsRawDescriptor::as_raw_descriptor)
rds .collect()
} }
fn device_type(&self) -> DeviceType { fn device_type(&self) -> DeviceType {
@ -667,8 +703,12 @@ impl VirtioDevice for Balloon {
if state.failable_update && state.actual_pages == state.num_pages { if state.failable_update && state.actual_pages == state.num_pages {
state.failable_update = false; state.failable_update = false;
if let Err(e) = send_adjusted_response(&self.command_tube, state.num_pages) { if let Some(ref command_tube) = self.command_tube {
error!("Failed to send response {:?}", e); if let Err(e) = send_adjusted_response(command_tube, state.num_pages) {
error!("Failed to send response {:?}", e);
}
} else {
panic!("Command tube missing!");
} }
} }
} }
@ -685,6 +725,21 @@ impl VirtioDevice for Balloon {
self.acked_features |= value; self.acked_features |= value;
} }
fn on_device_sandboxed(&mut self) {
if let Some(command_tube) = self.command_tube.take() {
let (tx, rx) = oneshot::channel();
let worker_stub_thread = thread::Builder::new()
.name("virtio_balloon_stub".to_string())
.spawn(move || run_stub_worker(rx, command_tube))
.expect("Failed to spawn balloon stub worker thread");
let _ = self.stub_worker_thread.insert(worker_stub_thread);
let _ = self.stub_signal.insert(tx);
} else {
panic!("Command tube missing!");
}
}
fn activate( fn activate(
&mut self, &mut self,
mem: GuestMemory, mem: GuestMemory,
@ -692,6 +747,13 @@ impl VirtioDevice for Balloon {
queues: Vec<Queue>, queues: Vec<Queue>,
queue_evts: Vec<Event>, queue_evts: Vec<Event>,
) { ) {
// Kill stub thread
std::mem::drop(self.stub_signal.take());
if let Some(Ok(handle)) = self.stub_worker_thread.take().map(thread::JoinHandle::join) {
let _ = self.command_tube.insert(handle);
}
let expected_queues = if self.event_queue_enabled() { 4 } else { 3 }; let expected_queues = if self.event_queue_enabled() { 4 } else { 3 };
if queues.len() != expected_queues || queue_evts.len() != expected_queues { if queues.len() != expected_queues || queue_evts.len() != expected_queues {
return; return;
@ -707,12 +769,18 @@ impl VirtioDevice for Balloon {
self.kill_evt = Some(self_kill_evt); self.kill_evt = Some(self_kill_evt);
let state = self.state.clone(); let state = self.state.clone();
let command_tube = match self.command_tube {
Some(ref tube) => tube,
None => {
panic!("Command tube missing!");
}
};
#[allow(deprecated)] #[allow(deprecated)]
let command_tube = match self.command_tube.try_clone() { let command_tube = match command_tube.try_clone() {
Ok(tube) => tube, Ok(tube) => tube,
Err(e) => { Err(e) => {
error!("failed to clone command tube {:?}", e); panic!("failed to clone command tube {:?}", e);
return;
} }
}; };
let inflate_tube = self.inflate_tube.take(); let inflate_tube = self.inflate_tube.take();
@ -733,7 +801,7 @@ impl VirtioDevice for Balloon {
match worker_result { match worker_result {
Err(e) => { Err(e) => {
error!("failed to spawn virtio_balloon worker: {}", e); panic!("failed to spawn virtio_balloon worker: {}", e);
} }
Ok(join_handle) => { Ok(join_handle) => {
self.worker_thread = Some(join_handle); self.worker_thread = Some(join_handle);

View file

@ -17,7 +17,6 @@ use std::os::unix::prelude::OpenOptionsExt;
use std::path::Path; use std::path::Path;
use std::str::FromStr; use std::str::FromStr;
use std::sync::{mpsc, Arc, Barrier}; use std::sync::{mpsc, Arc, Barrier};
use std::time::Duration;
use std::process; use std::process;
#[cfg(all(target_arch = "x86_64", feature = "gdb"))] #[cfg(all(target_arch = "x86_64", feature = "gdb"))]
@ -1135,10 +1134,6 @@ where
// Balloon gets a special socket so balloon requests can be forwarded // Balloon gets a special socket so balloon requests can be forwarded
// from the main process. // from the main process.
let (host, device) = Tube::pair().context("failed to create tube")?; let (host, device) = Tube::pair().context("failed to create tube")?;
// Set recv timeout to avoid deadlock on sending BalloonControlCommand
// before the guest is ready.
host.set_recv_timeout(Some(Duration::from_millis(100)))
.context("failed to set timeout")?;
(Some(host), Some(device)) (Some(host), Some(device))
} }
} else { } else {

View file

@ -36,10 +36,10 @@ pub use balloon_control::BalloonStats;
use balloon_control::{BalloonTubeCommand, BalloonTubeResult}; use balloon_control::{BalloonTubeCommand, BalloonTubeResult};
use base::{ use base::{
error, with_as_descriptor, AsRawDescriptor, Error as SysError, Event, ExternalMapping, error, trace, warn, with_as_descriptor, AsRawDescriptor, Error as SysError, Event,
FromRawDescriptor, IntoRawDescriptor, Killable, MappedRegion, MemoryMappingArena, ExternalMapping, FromRawDescriptor, IntoRawDescriptor, Killable, MappedRegion,
MemoryMappingBuilder, MemoryMappingBuilderUnix, MmapError, Protection, Result, SafeDescriptor, MemoryMappingArena, MemoryMappingBuilder, MemoryMappingBuilderUnix, MmapError, Protection,
SharedMemory, Tube, SIGRTMIN, Result, SafeDescriptor, SharedMemory, Tube, SIGRTMIN,
}; };
use hypervisor::{IrqRoute, IrqSource, Vm}; use hypervisor::{IrqRoute, IrqSource, Vm};
use resources::{Alloc, MmioType, SystemAllocator}; use resources::{Alloc, MmioType, SystemAllocator};
@ -1134,6 +1134,15 @@ impl VmRequest {
balloon_actual, balloon_actual,
}; };
} }
Ok(BalloonTubeResult::NotReady { id }) => {
if sent_id != id {
trace!("Wrong id for balloon stats");
// Keep trying to get the fresh stats.
continue;
}
warn!("balloon device not ready");
break VmResponse::Err(SysError::new(libc::EAGAIN));
}
Err(e) => { Err(e) => {
error!("balloon socket recv failed: {}", e); error!("balloon socket recv failed: {}", e);
break VmResponse::Err(SysError::last()); break VmResponse::Err(SysError::last());