diff --git a/src/plugin/mod.rs b/src/plugin/mod.rs index cd88bbf25e..db23909c6a 100644 --- a/src/plugin/mod.rs +++ b/src/plugin/mod.rs @@ -25,10 +25,9 @@ use protobuf::ProtobufError; use io_jail::{self, Minijail}; use kvm::{Kvm, Vm, Vcpu, VcpuExit, IoeventAddress, NoDatamatch}; use net_util::{Error as TapError, Tap, TapT}; -use sys_util::{EventFd, MmapError, Killable, SignalFd, SignalFdError, Poller, Pollable, - GuestMemory, Result as SysResult, Error as SysError, - register_signal_handler, block_signal, clear_signal, SIGRTMIN, - geteuid, getegid}; +use sys_util::{EventFd, MmapError, Killable, SignalFd, SignalFdError, PollContext, PollToken, + GuestMemory, Result as SysResult, Error as SysError, block_signal, clear_signal, + SIGRTMIN, register_signal_handler, geteuid, getegid}; use Config; @@ -47,6 +46,7 @@ pub enum Error { CreateJail(io_jail::Error), CreateKvm(SysError), CreateMainSocket(SysError), + CreatePollContext(SysError), CreateSignalFd(SignalFdError), CreateSocketPair(io::Error), CreateVcpu(SysError), @@ -74,6 +74,7 @@ pub enum Error { PluginTimeout, PluginWait(SysError), Poll(SysError), + PollContextAdd(SysError), SetGidMap(io_jail::Error), SetUidMap(io_jail::Error), SigChild { @@ -103,6 +104,7 @@ impl fmt::Display for Error { Error::CreateMainSocket(ref e) => { write!(f, "error creating main request socket: {:?}", e) } + Error::CreatePollContext(ref e) => write!(f, "failed to create poll context: {:?}", e), Error::CreateSignalFd(ref e) => write!(f, "failed to create signalfd: {:?}", e), Error::CreateSocketPair(ref e) => write!(f, "failed to create socket pair: {}", e), Error::CreateVcpu(ref e) => write!(f, "error creating vcpu: {:?}", e), @@ -138,6 +140,7 @@ impl fmt::Display for Error { Error::PluginTimeout => write!(f, "plugin did not exit within timeout"), Error::PluginWait(ref e) => write!(f, "error waiting for plugin to exit: {:?}", e), Error::Poll(ref e) => write!(f, "failed to poll all FDs: {:?}", e), + Error::PollContextAdd(ref e) => write!(f, "failed to add fd to poll context: {:?}", e), Error::SetGidMap(ref e) => write!(f, "failed to set gidmap for jail: {}", e), Error::SetUidMap(ref e) => write!(f, "failed to set uidmap for jail: {}", e), Error::SigChild { @@ -397,6 +400,13 @@ pub fn run_vcpus(kvm: &Kvm, Ok(()) } +#[derive(PollToken)] +enum Token { + Exit, + ChildSignal, + Plugin { index: usize }, +} + /// Run a VM with a plugin process specified by `cfg`. /// /// Not every field of `cfg` will be used. In particular, most field that pertain to a specific @@ -448,8 +458,7 @@ pub fn run_config(cfg: Config) -> Result<()> { let kvm = Kvm::new().map_err(Error::CreateKvm)?; let mut vm = Vm::new(&kvm, mem).map_err(Error::CreateVm)?; vm.create_irq_chip().map_err(Error::CreateIrqChip)?; - let mut plugin = Process::new(vcpu_count, &kvm, &mut vm, - plugin_path, &plugin_args, jail, tap_opt)?; + let mut plugin = Process::new(vcpu_count, plugin_path, &plugin_args, jail)?; let mut res = Ok(()); // If Some, we will exit after enough time is passed to shutdown cleanly. @@ -459,27 +468,17 @@ pub fn run_config(cfg: Config) -> Result<()> { let exit_evt = EventFd::new().map_err(Error::CreateEventFd)?; let kill_signaled = Arc::new(AtomicBool::new(false)); let mut vcpu_handles = Vec::with_capacity(vcpu_count as usize); - // It's possible that the plugin failed to indicate that it wanted the VM to start. We don't - // want to start VCPUs in such a case. - if plugin.is_started() { - res = run_vcpus(&kvm, - &vm, - &plugin, - vcpu_count, - &kill_signaled, - &exit_evt, - &mut vcpu_handles); - if res.is_err() { - dying_instant.get_or_insert(Instant::now()); - } - } else { - // If the plugin has not started by the time the process constructor returns, it's too late, - // and we start the clock on winding things down. - dying_instant.get_or_insert(Instant::now()); - } + + let poll_ctx = PollContext::new().map_err(Error::CreatePollContext)?; + poll_ctx + .add(&exit_evt, Token::Exit) + .map_err(Error::PollContextAdd)?; + poll_ctx + .add(&sigchld_fd, Token::ChildSignal) + .map_err(Error::PollContextAdd)?; let mut sockets_to_drop = Vec::new(); - let mut poller = Poller::new(3); + let mut redo_poll_ctx_sockets = true; // In this loop, make every attempt to not return early. If an error is encountered, set `res` // to the error, set `dying_instant` to now, and signal the plugin that it will be killed soon. // If the plugin cannot be singaled because it is dead of `signal_kill` failed, simply break @@ -492,25 +491,19 @@ pub fn run_config(cfg: Config) -> Result<()> { break; } - const EXIT: u32 = 0; - const CHILD_SIGNAL: u32 = 1; - const PLUGIN_BASE: u32 = 2; - let tokens = { - let mut pollables = Vec::new(); - // No need to check the exit event if we are already doing cleanup. - if dying_instant.is_none() { - pollables.push((EXIT, &exit_evt as &Pollable)); - } - pollables.push((CHILD_SIGNAL, &sigchld_fd as &Pollable)); - for (i, socket) in plugin.sockets().iter().enumerate() { - pollables.push((PLUGIN_BASE + i as u32, socket as &Pollable)); + if redo_poll_ctx_sockets { + for (index, socket) in plugin.sockets().iter().enumerate() { + poll_ctx + .add(socket, Token::Plugin { index }) + .map_err(Error::PollContextAdd)?; } + } + let plugin_socket_count = plugin.sockets().len(); + let events = { let poll_res = match dying_instant { - Some(ref inst) => { - poller.poll_timeout(&pollables[..], &mut (duration_to_die - inst.elapsed())) - } - None => poller.poll(&pollables[..]), + Some(ref inst) => poll_ctx.wait_timeout(duration_to_die - inst.elapsed()), + None => poll_ctx.wait(), }; match poll_res { Ok(v) => v, @@ -523,16 +516,18 @@ pub fn run_config(cfg: Config) -> Result<()> { } } }; - for &token in tokens { - match token { - EXIT => { + for event in events.iter_readable() { + match event.token() { + Token::Exit => { + // No need to check the exit event if we are already doing cleanup. + let _ = poll_ctx.delete(&exit_evt); dying_instant.get_or_insert(Instant::now()); let sig_res = plugin.signal_kill(); if res.is_ok() && sig_res.is_err() { res = sig_res.map_err(Error::PluginKill); } } - CHILD_SIGNAL => { + Token::ChildSignal => { // Print all available siginfo structs, then exit the loop. loop { match sigchld_fd.read() { @@ -572,30 +567,55 @@ pub fn run_config(cfg: Config) -> Result<()> { res = sig_res.map_err(Error::PluginKill); } } - t if t >= PLUGIN_BASE && t < PLUGIN_BASE + (plugin.sockets().len() as u32) => { - let socket_index = (t - PLUGIN_BASE) as usize; - match plugin.handle_socket(socket_index, &kvm, &mut vm, &vcpu_handles) { + Token::Plugin { index } => { + match plugin.handle_socket(index, + &kvm, + &mut vm, + &vcpu_handles, + tap_opt.as_ref()) { Ok(_) => {} // A HUP is an expected event for a socket, so don't bother warning about // it. - Err(Error::PluginSocketHup) => sockets_to_drop.push(socket_index), + Err(Error::PluginSocketHup) => sockets_to_drop.push(index), // Only one connection out of potentially many is broken. Drop it, but don't // start cleaning up. Because the error isn't returned, we will warn about // it here. Err(e) => { warn!("error handling plugin socket: {}", e); - sockets_to_drop.push(socket_index); + sockets_to_drop.push(index); } } } - _ => {} } } + if vcpu_handles.is_empty() && dying_instant.is_none() && plugin.is_started() { + let res = run_vcpus(&kvm, + &vm, + &plugin, + vcpu_count, + &kill_signaled, + &exit_evt, + &mut vcpu_handles); + if let Err(e) = res { + dying_instant.get_or_insert(Instant::now()); + error!("failed to start vcpus: {}", e); + } + } + + redo_poll_ctx_sockets = !sockets_to_drop.is_empty() || + plugin.sockets().len() != plugin_socket_count; + // Cleanup all of the sockets that we have determined were disconnected or suffered some // other error. plugin.drop_sockets(&mut sockets_to_drop); sockets_to_drop.clear(); + + if redo_poll_ctx_sockets { + for socket in plugin.sockets() { + let _ = poll_ctx.delete(socket); + } + } } // vcpu threads MUST see the kill signaled flag, otherwise they may re-enter the VM. diff --git a/src/plugin/process.rs b/src/plugin/process.rs index 7450fe4e36..a12c214a45 100644 --- a/src/plugin/process.rs +++ b/src/plugin/process.rs @@ -24,8 +24,8 @@ use protobuf::Message; use io_jail::Minijail; use kvm::{Vm, IoeventAddress, NoDatamatch, IrqSource, IrqRoute, dirty_log_bitmap_size}; -use sys_util::{EventFd, MemoryMapping, Killable, Scm, Poller, Pollable, SharedMemory, - GuestAddress, Result as SysResult, Error as SysError}; +use sys_util::{EventFd, MemoryMapping, Killable, Scm, SharedMemory, GuestAddress, + Result as SysResult, Error as SysError, SIGRTMIN}; use plugin_proto::*; use super::*; @@ -58,7 +58,6 @@ pub struct Process { // Resource to sent to plugin kill_evt: EventFd, vcpu_sockets: Vec<(UnixDatagram, UnixDatagram)>, - tap: Option, // Socket Transmission scm: Scm, @@ -77,12 +76,9 @@ impl Process { /// Due to an API limitation in libminijail necessitating that this function set an environment /// variable, this function is not thread-safe. pub fn new(cpu_count: u32, - kvm: &Kvm, - vm: &mut Vm, cmd: &Path, args: &[&str], - jail: Option, - tap: Option) + jail: Option) -> Result { let (request_socket, child_socket) = new_seqpacket_pair().map_err(Error::CreateMainSocket)?; @@ -110,65 +106,20 @@ impl Process { } }; - // Very important to drop the child socket so that the pair will properly hang up if the - // plugin process exits or closes its end. - drop(child_socket); - - let request_sockets = vec![request_socket]; - - let mut plugin = Process { - started: false, - plugin_pid, - request_sockets, - objects: Default::default(), - shared_vcpu_state: Default::default(), - per_vcpu_states, - kill_evt: EventFd::new().map_err(Error::CreateEventFd)?, - vcpu_sockets, - tap, - scm: Scm::new(1), - request_buffer: vec![0; MAX_DATAGRAM_SIZE], - datagram_files: Vec::new(), - response_buffer: Vec::new(), - }; - - plugin.run_until_started(kvm, vm)?; - - Ok(plugin) - } - - - fn run_until_started(&mut self, kvm: &Kvm, vm: &mut Vm) -> Result<()> { - let mut sockets_to_drop = Vec::new(); - let mut poller = Poller::new(1); - while !self.started { - if self.request_sockets.is_empty() { - break; - } - - let tokens = { - let mut pollables = Vec::with_capacity(self.objects.len()); - for (i, socket) in self.request_sockets.iter().enumerate() { - pollables.push((i as u32, socket as &Pollable)); - } - poller - .poll(&pollables[..]) - .map_err(Error::PluginSocketPoll)? - }; - - for &token in tokens { - match self.handle_socket(token as usize, kvm, vm, &[]) { - Ok(_) => {} - Err(Error::PluginSocketHup) => sockets_to_drop.push(token as usize), - r => return r, - } - } - - self.drop_sockets(&mut sockets_to_drop); - sockets_to_drop.clear(); - } - - Ok(()) + Ok(Process { + started: false, + plugin_pid, + request_sockets: vec![request_socket], + objects: Default::default(), + shared_vcpu_state: Default::default(), + per_vcpu_states, + kill_evt: EventFd::new().map_err(Error::CreateEventFd)?, + vcpu_sockets, + scm: Scm::new(1), + request_buffer: vec![0; MAX_DATAGRAM_SIZE], + datagram_files: Vec::new(), + response_buffer: Vec::new(), + }) } /// Creates a VCPU plugin connection object, used by a VCPU run loop to communicate with the @@ -422,7 +373,8 @@ impl Process { index: usize, kvm: &Kvm, vm: &mut Vm, - vcpu_handles: &[JoinHandle<()>]) + vcpu_handles: &[JoinHandle<()>], + tap: Option<&Tap>) -> Result<()> { let msg_size = self.scm .recv(&self.request_sockets[index], @@ -557,15 +509,16 @@ impl Process { Ok(()) } } else if request.has_get_net_config() { - match self.tap { - Some(ref tap) => match Self::handle_get_net_config(tap, - response.mut_get_net_config()) { - Ok(_) => { - response_fds.push(tap.as_raw_fd()); - Ok(()) - }, - Err(e) => Err(e), - }, + match tap { + Some(tap) => { + match Self::handle_get_net_config(tap, response.mut_get_net_config()) { + Ok(_) => { + response_fds.push(tap.as_raw_fd()); + Ok(()) + } + Err(e) => Err(e), + } + } None => Err(SysError::new(-ENODATA)), } } else if request.has_dirty_log() { diff --git a/sys_util/src/signalfd.rs b/sys_util/src/signalfd.rs index dc08038146..62e9ba93b9 100644 --- a/sys_util/src/signalfd.rs +++ b/sys_util/src/signalfd.rs @@ -108,6 +108,12 @@ unsafe impl ::Pollable for SignalFd { } } +impl AsRawFd for SignalFd { + fn as_raw_fd(&self) -> RawFd { + self.signalfd.as_raw_fd() + } +} + impl Drop for SignalFd { fn drop(&mut self) { // This is thread-safe and safe in the sense that we're doing what