diff --git a/base/src/sys/unix/mod.rs b/base/src/sys/unix/mod.rs index 9c7f358367..7c9f7e28d1 100644 --- a/base/src/sys/unix/mod.rs +++ b/base/src/sys/unix/mod.rs @@ -668,8 +668,6 @@ pub fn number_of_logical_cores() -> Result { mod tests { use std::io::Write; - use libc::EBADF; - use super::*; #[test] @@ -682,35 +680,4 @@ mod tests { tx.write(&[0u8; 8]) .expect_err("Write after fill didn't fail"); } - - #[test] - fn safe_descriptor_from_path_valid() { - assert!(safe_descriptor_from_path(Path::new("/proc/self/fd/2")) - .unwrap() - .is_some()); - } - - #[test] - fn safe_descriptor_from_path_invalid_integer() { - assert_eq!( - safe_descriptor_from_path(Path::new("/proc/self/fd/blah")), - Err(Error::new(EINVAL)) - ); - } - - #[test] - fn safe_descriptor_from_path_invalid_fd() { - assert_eq!( - safe_descriptor_from_path(Path::new("/proc/self/fd/42")), - Err(Error::new(EBADF)) - ); - } - - #[test] - fn safe_descriptor_from_path_none() { - assert_eq!( - safe_descriptor_from_path(Path::new("/something/else")).unwrap(), - None - ); - } } diff --git a/base/src/sys/unix/net.rs b/base/src/sys/unix/net.rs index db7d11df71..3dcd286707 100644 --- a/base/src/sys/unix/net.rs +++ b/base/src/sys/unix/net.rs @@ -930,16 +930,8 @@ impl Drop for UnlinkUnixSeqpacketListener { #[cfg(test)] mod tests { - use std::env; - use std::io::ErrorKind; - use std::path::PathBuf; - use super::*; - fn tmpdir() -> PathBuf { - env::temp_dir() - } - #[test] fn sockaddr_un_zero_length_input() { let _res = sockaddr_un(Path::new("")).expect("sockaddr_un failed"); @@ -984,231 +976,4 @@ mod tests { assert_eq!(addr_char, ref_char); } } - - #[test] - fn unix_seqpacket_path_not_exists() { - let res = UnixSeqpacket::connect("/path/not/exists"); - assert!(res.is_err()); - } - - #[test] - fn unix_seqpacket_listener_path() { - let mut socket_path = tmpdir(); - socket_path.push("unix_seqpacket_listener_path"); - let listener = UnlinkUnixSeqpacketListener( - UnixSeqpacketListener::bind(&socket_path) - .expect("failed to create UnixSeqpacketListener"), - ); - let listener_path = listener.path().expect("failed to get socket listener path"); - assert_eq!(socket_path, listener_path); - } - - #[test] - fn unix_seqpacket_listener_from_fd() { - let mut socket_path = tmpdir(); - socket_path.push("unix_seqpacket_listener_from_fd"); - let listener = UnlinkUnixSeqpacketListener( - UnixSeqpacketListener::bind(&socket_path) - .expect("failed to create UnixSeqpacketListener"), - ); - // UnixSeqpacketListener should succeed on a valid listening descriptor. - let good_dup = UnixSeqpacketListener::bind(&format!("/proc/self/fd/{}", unsafe { - libc::dup(listener.as_raw_fd()) - })); - let good_dup_path = good_dup - .expect("failed to create dup UnixSeqpacketListener") - .path(); - // Path of socket created by descriptor should be hidden. - assert!(good_dup_path.is_err()); - // UnixSeqpacketListener must fail on an existing non-listener socket. - let s1 = - UnixSeqpacket::connect(socket_path.as_path()).expect("UnixSeqpacket::connect failed"); - let bad_dup = UnixSeqpacketListener::bind(&format!("/proc/self/fd/{}", unsafe { - libc::dup(s1.as_raw_fd()) - })); - assert!(bad_dup.is_err()); - } - - #[test] - fn unix_seqpacket_path_exists_pass() { - let mut socket_path = tmpdir(); - socket_path.push("path_to_socket"); - let _listener = UnlinkUnixSeqpacketListener( - UnixSeqpacketListener::bind(&socket_path) - .expect("failed to create UnixSeqpacketListener"), - ); - let _res = - UnixSeqpacket::connect(socket_path.as_path()).expect("UnixSeqpacket::connect failed"); - } - - #[test] - fn unix_seqpacket_path_listener_accept_with_timeout() { - let mut socket_path = tmpdir(); - socket_path.push("path_listerner_accept_with_timeout"); - let listener = UnlinkUnixSeqpacketListener( - UnixSeqpacketListener::bind(&socket_path) - .expect("failed to create UnixSeqpacketListener"), - ); - - for d in [Duration::from_millis(10), Duration::ZERO] { - let _ = listener.accept_with_timeout(d).expect_err(&format!( - "UnixSeqpacket::accept_with_timeout {:?} connected", - d - )); - - let s1 = UnixSeqpacket::connect(socket_path.as_path()) - .unwrap_or_else(|_| panic!("UnixSeqpacket::connect {:?} failed", d)); - - let s2 = listener - .accept_with_timeout(d) - .unwrap_or_else(|_| panic!("UnixSeqpacket::accept {:?} failed", d)); - - let data1 = &[0, 1, 2, 3, 4]; - let data2 = &[10, 11, 12, 13, 14]; - s2.send(data2).expect("failed to send data2"); - s1.send(data1).expect("failed to send data1"); - let recv_data = &mut [0; 5]; - s2.recv(recv_data).expect("failed to recv data"); - assert_eq!(data1, recv_data); - s1.recv(recv_data).expect("failed to recv data"); - assert_eq!(data2, recv_data); - } - } - - #[test] - fn unix_seqpacket_path_listener_accept() { - let mut socket_path = tmpdir(); - socket_path.push("path_listerner_accept"); - let listener = UnlinkUnixSeqpacketListener( - UnixSeqpacketListener::bind(&socket_path) - .expect("failed to create UnixSeqpacketListener"), - ); - let s1 = - UnixSeqpacket::connect(socket_path.as_path()).expect("UnixSeqpacket::connect failed"); - - let s2 = listener.accept().expect("UnixSeqpacket::accept failed"); - - let data1 = &[0, 1, 2, 3, 4]; - let data2 = &[10, 11, 12, 13, 14]; - s2.send(data2).expect("failed to send data2"); - s1.send(data1).expect("failed to send data1"); - let recv_data = &mut [0; 5]; - s2.recv(recv_data).expect("failed to recv data"); - assert_eq!(data1, recv_data); - s1.recv(recv_data).expect("failed to recv data"); - assert_eq!(data2, recv_data); - } - - #[test] - fn unix_seqpacket_zero_timeout() { - let (s1, _s2) = UnixSeqpacket::pair().expect("failed to create socket pair"); - // Timeouts less than a microsecond are too small and round to zero. - s1.set_read_timeout(Some(Duration::from_nanos(10))) - .expect_err("successfully set zero timeout"); - } - - #[test] - fn unix_seqpacket_read_timeout() { - let (s1, _s2) = UnixSeqpacket::pair().expect("failed to create socket pair"); - s1.set_read_timeout(Some(Duration::from_millis(1))) - .expect("failed to set read timeout for socket"); - let _ = s1.recv(&mut [0]); - } - - #[test] - fn unix_seqpacket_write_timeout() { - let (s1, _s2) = UnixSeqpacket::pair().expect("failed to create socket pair"); - s1.set_write_timeout(Some(Duration::from_millis(1))) - .expect("failed to set write timeout for socket"); - } - - #[test] - fn unix_seqpacket_send_recv() { - let (s1, s2) = UnixSeqpacket::pair().expect("failed to create socket pair"); - let data1 = &[0, 1, 2, 3, 4]; - let data2 = &[10, 11, 12, 13, 14]; - s2.send(data2).expect("failed to send data2"); - s1.send(data1).expect("failed to send data1"); - let recv_data = &mut [0; 5]; - s2.recv(recv_data).expect("failed to recv data"); - assert_eq!(data1, recv_data); - s1.recv(recv_data).expect("failed to recv data"); - assert_eq!(data2, recv_data); - } - - #[test] - fn unix_seqpacket_send_fragments() { - let (s1, s2) = UnixSeqpacket::pair().expect("failed to create socket pair"); - let data1 = &[0, 1, 2, 3, 4]; - let data2 = &[10, 11, 12, 13, 14, 15, 16]; - s1.send(data1).expect("failed to send data1"); - s1.send(data2).expect("failed to send data2"); - - let recv_data = &mut [0; 32]; - let size = s2.recv(recv_data).expect("failed to recv data"); - assert_eq!(size, data1.len()); - assert_eq!(data1, &recv_data[0..size]); - - let size = s2.recv(recv_data).expect("failed to recv data"); - assert_eq!(size, data2.len()); - assert_eq!(data2, &recv_data[0..size]); - } - - #[test] - fn unix_seqpacket_get_readable_bytes() { - let (s1, s2) = UnixSeqpacket::pair().expect("failed to create socket pair"); - assert_eq!(s1.get_readable_bytes().unwrap(), 0); - assert_eq!(s2.get_readable_bytes().unwrap(), 0); - let data1 = &[0, 1, 2, 3, 4]; - s1.send(data1).expect("failed to send data"); - - assert_eq!(s1.get_readable_bytes().unwrap(), 0); - assert_eq!(s2.get_readable_bytes().unwrap(), data1.len()); - - let recv_data = &mut [0; 5]; - s2.recv(recv_data).expect("failed to recv data"); - assert_eq!(s1.get_readable_bytes().unwrap(), 0); - assert_eq!(s2.get_readable_bytes().unwrap(), 0); - } - - #[test] - fn unix_seqpacket_next_packet_size() { - let (s1, s2) = UnixSeqpacket::pair().expect("failed to create socket pair"); - let data1 = &[0, 1, 2, 3, 4]; - s1.send(data1).expect("failed to send data"); - - assert_eq!(s2.next_packet_size().unwrap(), 5); - s1.set_read_timeout(Some(Duration::from_micros(1))) - .expect("failed to set read timeout"); - assert_eq!( - s1.next_packet_size().unwrap_err().kind(), - ErrorKind::WouldBlock - ); - drop(s2); - assert_eq!( - s1.next_packet_size().unwrap_err().kind(), - ErrorKind::ConnectionReset - ); - } - - #[test] - fn unix_seqpacket_recv_to_vec() { - let (s1, s2) = UnixSeqpacket::pair().expect("failed to create socket pair"); - let data1 = &[0, 1, 2, 3, 4]; - s1.send(data1).expect("failed to send data"); - - let recv_data = &mut vec![]; - s2.recv_to_vec(recv_data).expect("failed to recv data"); - assert_eq!(recv_data, &mut vec![0, 1, 2, 3, 4]); - } - - #[test] - fn unix_seqpacket_recv_as_vec() { - let (s1, s2) = UnixSeqpacket::pair().expect("failed to create socket pair"); - let data1 = &[0, 1, 2, 3, 4]; - s1.send(data1).expect("failed to send data"); - - let recv_data = s2.recv_as_vec().expect("failed to recv data"); - assert_eq!(recv_data, vec![0, 1, 2, 3, 4]); - } } diff --git a/base/src/sys/unix/scoped_signal_handler.rs b/base/src/sys/unix/scoped_signal_handler.rs index 7411126eac..0f759b56c8 100644 --- a/base/src/sys/unix/scoped_signal_handler.rs +++ b/base/src/sys/unix/scoped_signal_handler.rs @@ -176,248 +176,3 @@ pub fn wait_for_interrupt() -> Result<()> { Err(err) => Err(Error::WaitForSignal(err)), } } - -#[cfg(test)] -mod tests { - use std::fs::File; - use std::io::BufRead; - use std::io::BufReader; - use std::mem::zeroed; - use std::ptr::null; - use std::ptr::null_mut; - use std::sync::atomic::AtomicI32; - use std::sync::atomic::AtomicUsize; - use std::sync::atomic::Ordering; - use std::sync::Arc; - use std::sync::Mutex; - use std::sync::MutexGuard; - use std::sync::Once; - use std::thread::sleep; - use std::thread::spawn; - use std::time::Duration; - use std::time::Instant; - - use libc::sigaction; - - use super::super::gettid; - use super::super::kill; - use super::super::Pid; - use super::*; - - const TEST_SIGNAL: Signal = Signal::User1; - const TEST_SIGNALS: &[Signal] = &[Signal::User1, Signal::User2]; - - static TEST_SIGNAL_COUNTER: AtomicUsize = AtomicUsize::new(0); - - /// Only allows one test case to execute at a time. - fn get_mutex() -> MutexGuard<'static, ()> { - static INIT: Once = Once::new(); - static mut VAL: Option>> = None; - - INIT.call_once(|| { - let val = Some(Arc::new(Mutex::new(()))); - // Safe because the mutation is protected by the Once. - unsafe { VAL = val } - }); - - // Safe mutation only happens in the Once. - unsafe { VAL.as_ref() }.unwrap().lock().unwrap() - } - - fn reset_counter() { - TEST_SIGNAL_COUNTER.swap(0, Ordering::SeqCst); - } - - fn get_sigaction(signal: Signal) -> Result { - // Safe because sigaction is owned and expected to be initialized ot zeros. - let mut sigact: sigaction = unsafe { zeroed() }; - - if unsafe { sigaction(signal.into(), null(), &mut sigact) } < 0 { - Err(Error::Sigaction(signal, ErrnoError::last())) - } else { - Ok(sigact) - } - } - - /// Safety: - /// This is only safe if the signal handler set in sigaction is safe. - unsafe fn restore_sigaction(signal: Signal, sigact: sigaction) -> Result { - if sigaction(signal.into(), &sigact, null_mut()) < 0 { - Err(Error::Sigaction(signal, ErrnoError::last())) - } else { - Ok(sigact) - } - } - - /// Safety: - /// Safe if the signal handler for Signal::User1 is safe. - unsafe fn send_test_signal() { - kill(gettid(), Signal::User1.into()).unwrap() - } - - macro_rules! assert_counter_eq { - ($compare_to:expr) => {{ - let expected: usize = $compare_to; - let got: usize = TEST_SIGNAL_COUNTER.load(Ordering::SeqCst); - if got != expected { - panic!( - "wrong signal counter value: got {}; expected {}", - got, expected - ); - } - }}; - } - - struct TestHandler; - - /// # Safety - /// Safe because handle_signal is async-signal safe. - unsafe impl SignalHandler for TestHandler { - fn handle_signal(signal: Signal) { - if TEST_SIGNAL == signal { - TEST_SIGNAL_COUNTER.fetch_add(1, Ordering::SeqCst); - } - } - } - - #[test] - fn scopedsignalhandler_success() { - // Prevent other test cases from running concurrently since the signal - // handlers are shared for the process. - let _guard = get_mutex(); - - reset_counter(); - assert_counter_eq!(0); - - assert!(has_default_signal_handler(TEST_SIGNAL.into()).unwrap()); - let handler = ScopedSignalHandler::new::(&[TEST_SIGNAL]).unwrap(); - assert!(!has_default_signal_handler(TEST_SIGNAL.into()).unwrap()); - - // Safe because test_handler is safe. - unsafe { send_test_signal() }; - - // Give the handler time to run in case it is on a different thread. - for _ in 1..40 { - if TEST_SIGNAL_COUNTER.load(Ordering::SeqCst) > 0 { - break; - } - sleep(Duration::from_millis(250)); - } - - assert_counter_eq!(1); - - drop(handler); - assert!(has_default_signal_handler(TEST_SIGNAL.into()).unwrap()); - } - - #[test] - fn scopedsignalhandler_handleralreadyset() { - // Prevent other test cases from running concurrently since the signal - // handlers are shared for the process. - let _guard = get_mutex(); - - reset_counter(); - assert_counter_eq!(0); - - assert!(has_default_signal_handler(TEST_SIGNAL.into()).unwrap()); - // Safe because TestHandler is async-signal safe. - let handler = ScopedSignalHandler::new::(&[TEST_SIGNAL]).unwrap(); - assert!(!has_default_signal_handler(TEST_SIGNAL.into()).unwrap()); - - // Safe because TestHandler is async-signal safe. - assert!(matches!( - ScopedSignalHandler::new::(TEST_SIGNALS), - Err(Error::HandlerAlreadySet(Signal::User1)) - )); - - assert_counter_eq!(0); - drop(handler); - assert!(has_default_signal_handler(TEST_SIGNAL.into()).unwrap()); - } - - /// Stores the thread used by WaitForInterruptHandler. - static WAIT_FOR_INTERRUPT_THREAD_ID: AtomicI32 = AtomicI32::new(0); - /// Forwards SIGINT to the appropriate thread. - struct WaitForInterruptHandler; - - /// # Safety - /// Safe because handle_signal is async-signal safe. - unsafe impl SignalHandler for WaitForInterruptHandler { - fn handle_signal(_: Signal) { - let tid = WAIT_FOR_INTERRUPT_THREAD_ID.load(Ordering::SeqCst); - // If the thread ID is set and executed on the wrong thread, forward the signal. - if tid != 0 && gettid() != tid { - // Safe because the handler is safe and the target thread id is expecting the signal. - unsafe { kill(tid, Signal::Interrupt.into()) }.unwrap(); - } - } - } - - /// Query /proc/${tid}/status for its State and check if it is either S (sleeping) or in - /// D (disk sleep). - fn thread_is_sleeping(tid: Pid) -> result::Result { - const PREFIX: &str = "State:"; - let mut status_reader = BufReader::new(File::open(format!("/proc/{}/status", tid))?); - let mut line = String::new(); - loop { - let count = status_reader.read_line(&mut line)?; - if count == 0 { - return Err(ErrnoError::new(libc::EIO)); - } - if let Some(stripped) = line.strip_prefix(PREFIX) { - return Ok(matches!( - stripped.trim_start().chars().next(), - Some('S') | Some('D') - )); - } - line.clear(); - } - } - - /// Wait for a process to block either in a sleeping or disk sleep state. - fn wait_for_thread_to_sleep(tid: Pid, timeout: Duration) -> result::Result<(), ErrnoError> { - let start = Instant::now(); - loop { - if thread_is_sleeping(tid)? { - return Ok(()); - } - if start.elapsed() > timeout { - return Err(ErrnoError::new(libc::EAGAIN)); - } - sleep(Duration::from_millis(50)); - } - } - - #[test] - fn waitforinterrupt_success() { - // Prevent other test cases from running concurrently since the signal - // handlers are shared for the process. - let _guard = get_mutex(); - - let to_restore = get_sigaction(Signal::Interrupt).unwrap(); - clear_signal_handler(Signal::Interrupt.into()).unwrap(); - // Safe because TestHandler is async-signal safe. - let handler = - ScopedSignalHandler::new::(&[Signal::Interrupt]).unwrap(); - - let tid = gettid(); - WAIT_FOR_INTERRUPT_THREAD_ID.store(tid, Ordering::SeqCst); - - let join_handle = spawn(move || -> result::Result<(), ErrnoError> { - // Wait unitl the thread is ready to receive the signal. - wait_for_thread_to_sleep(tid, Duration::from_secs(10)).unwrap(); - - // Safe because the SIGINT handler is safe. - unsafe { kill(tid, Signal::Interrupt.into()) } - }); - let wait_ret = wait_for_interrupt(); - let join_ret = join_handle.join(); - - drop(handler); - // Safe because we are restoring the previous SIGINT handler. - unsafe { restore_sigaction(Signal::Interrupt, to_restore) }.unwrap(); - - wait_ret.unwrap(); - join_ret.unwrap().unwrap(); - } -} diff --git a/base/src/sys/unix/syslog.rs b/base/src/sys/unix/syslog.rs index 3b0b379cc2..469deb1ecf 100644 --- a/base/src/sys/unix/syslog.rs +++ b/base/src/sys/unix/syslog.rs @@ -3,52 +3,3 @@ // found in the LICENSE file. pub use super::target_os::syslog::PlatformSyslog; -pub use super::RawDescriptor; - -#[cfg(test)] -mod tests { - use std::io::Read; - use std::io::Seek; - use std::io::SeekFrom; - - use crate::syslog::*; - - #[test] - fn fds() { - ensure_inited().unwrap(); - let mut fds = Vec::new(); - push_descriptors(&mut fds); - assert!(!fds.is_empty()); - for fd in fds { - assert!(fd >= 0); - } - } - - #[test] - fn syslog_file() { - ensure_inited().unwrap(); - let mut file = tempfile::tempfile().expect("failed to create tempfile"); - - let syslog_file = file.try_clone().expect("error cloning shared memory file"); - let state = State::new(LogConfig { - pipe: Some(Box::new(syslog_file)), - ..Default::default() - }) - .unwrap(); - - const TEST_STR: &str = "hello shared memory file"; - state.log( - &log::RecordBuilder::new() - .level(Level::Error) - .args(format_args!("{}", TEST_STR)) - .build(), - ); - - file.seek(SeekFrom::Start(0)) - .expect("error seeking shared memory file"); - let mut buf = String::new(); - file.read_to_string(&mut buf) - .expect("error reading shared memory file"); - assert!(buf.contains(TEST_STR)); - } -} diff --git a/base/src/sys/unix/tube.rs b/base/src/sys/unix/tube.rs index 69a142105a..8d081e54b9 100644 --- a/base/src/sys/unix/tube.rs +++ b/base/src/sys/unix/tube.rs @@ -191,93 +191,3 @@ impl AsRawDescriptor for RecvTube { self.0.as_raw_descriptor() } } - -#[cfg(test)] -mod test { - use std::time; - - use super::*; - use crate::EventContext; - use crate::EventToken; - use crate::ReadNotifier; - - #[derive(EventToken, Debug, Eq, PartialEq, Copy, Clone)] - enum Token { - ReceivedData, - } - - const EVENT_WAIT_TIME: time::Duration = time::Duration::from_secs(10); - - #[test] - fn test_serialize_tube_new() { - let (sock_send, sock_recv) = - StreamChannel::pair(BlockingMode::Nonblocking, FramingMode::Message).unwrap(); - let tube_send = Tube::new(sock_send).unwrap(); - let tube_recv = Tube::new(sock_recv).unwrap(); - - // Serialize the Tube - let msg_serialize = SerializeDescriptors::new(&tube_send); - let serialized = serde_json::to_vec(&msg_serialize).unwrap(); - let msg_descriptors = msg_serialize.into_descriptors(); - - // Deserialize the Tube - let mut msg_descriptors_safe = msg_descriptors - .into_iter() - .map(|v| Some(unsafe { SafeDescriptor::from_raw_descriptor(v) })) - .collect(); - let tube_deserialized: Tube = deserialize_with_descriptors( - || serde_json::from_slice(&serialized), - &mut msg_descriptors_safe, - ) - .unwrap(); - - // Send a message through deserialized Tube - tube_deserialized.send(&"hi".to_string()).unwrap(); - - // Wait for the message to arrive - let event_ctx: EventContext = - EventContext::build_with(&[(tube_recv.get_read_notifier(), Token::ReceivedData)]) - .unwrap(); - let events = event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap(); - let tokens: Vec = events - .iter() - .filter(|e| e.is_readable) - .map(|e| e.token) - .collect(); - assert_eq!(tokens, vec! {Token::ReceivedData}); - - assert_eq!(tube_recv.recv::().unwrap(), "hi"); - } - - #[test] - fn test_send_recv_new_from_seqpacket() { - let (sock_send, sock_recv) = UnixSeqpacket::pair().unwrap(); - let tube_send = Tube::new_from_unix_seqpacket(sock_send); - let tube_recv = Tube::new_from_unix_seqpacket(sock_recv); - - tube_send.send(&"hi".to_string()).unwrap(); - - // Wait for the message to arrive - let event_ctx: EventContext = - EventContext::build_with(&[(tube_recv.get_read_notifier(), Token::ReceivedData)]) - .unwrap(); - let events = event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap(); - let tokens: Vec = events - .iter() - .filter(|e| e.is_readable) - .map(|e| e.token) - .collect(); - assert_eq!(tokens, vec! {Token::ReceivedData}); - - assert_eq!(tube_recv.recv::().unwrap(), "hi"); - } - - #[test] - fn test_tube_new_byte_mode_error() { - let (sock_byte_mode, _) = - StreamChannel::pair(BlockingMode::Nonblocking, FramingMode::Byte).unwrap(); - let tube_error = Tube::new(sock_byte_mode); - - assert!(tube_error.is_err()); - } -} diff --git a/base/src/syslog.rs b/base/src/syslog.rs index 9e629d22ba..e3654bcaf1 100644 --- a/base/src/syslog.rs +++ b/base/src/syslog.rs @@ -190,7 +190,7 @@ pub(crate) trait Syslog { ) -> Result<(Option>, Option), Error>; } -pub(crate) struct State { +pub struct State { /// Record filter filter: env_logger::filter::Filter, /// All the loggers we have diff --git a/base/src/tube.rs b/base/src/tube.rs index 4b01e20e2e..d85689fb4d 100644 --- a/base/src/tube.rs +++ b/base/src/tube.rs @@ -139,199 +139,3 @@ pub enum Error { } pub type Result = std::result::Result; - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - use std::sync::Arc; - use std::sync::Barrier; - use std::thread; - use std::time::Duration; - - use serde::Deserialize; - use serde::Serialize; - - use super::*; - use crate::descriptor::FromRawDescriptor; - use crate::descriptor::SafeDescriptor; - use crate::platform::deserialize_with_descriptors; - use crate::platform::SerializeDescriptors; - use crate::Event; - use crate::EventToken; - use crate::ReadNotifier; - use crate::WaitContext; - - #[derive(Serialize, Deserialize)] - struct DataStruct { - x: u32, - } - - #[derive(EventToken, Debug, Eq, PartialEq, Copy, Clone)] - enum Token { - ReceivedData, - } - - // Magics to identify which producer sent a message (& detect corruption). - const PRODUCER_ID_1: u32 = 801279273; - const PRODUCER_ID_2: u32 = 345234861; - - #[track_caller] - fn test_event_pair(send: Event, recv: Event) { - send.signal().unwrap(); - recv.wait_timeout(Duration::from_secs(1)).unwrap(); - } - - #[test] - fn send_recv_no_fd() { - let (s1, s2) = Tube::pair().unwrap(); - - let test_msg = "hello world"; - s1.send(&test_msg).unwrap(); - let recv_msg: String = s2.recv().unwrap(); - - assert_eq!(test_msg, recv_msg); - } - - #[test] - fn send_recv_one_fd() { - #[derive(Serialize, Deserialize)] - struct EventStruct { - x: u32, - b: Event, - } - - let (s1, s2) = Tube::pair().unwrap(); - - let test_msg = EventStruct { - x: 100, - b: Event::new().unwrap(), - }; - s1.send(&test_msg).unwrap(); - let recv_msg: EventStruct = s2.recv().unwrap(); - - assert_eq!(test_msg.x, recv_msg.x); - - test_event_pair(test_msg.b, recv_msg.b); - } - - /// Send messages to a Tube with the given identifier (see `consume_messages`; we use this to - /// track different message producers). - #[track_caller] - fn produce_messages(tube: SendTube, data: u32, barrier: Arc) -> SendTube { - let data = DataStruct { x: data }; - barrier.wait(); - for _ in 0..100 { - tube.send(&data).unwrap(); - } - tube - } - - /// Consumes the given number of messages from a Tube, returning the number messages read with - /// each producer ID. - #[track_caller] - fn consume_messages( - tube: RecvTube, - count: usize, - barrier: Arc, - ) -> (RecvTube, usize, usize) { - barrier.wait(); - - let mut id1_count = 0usize; - let mut id2_count = 0usize; - - for _ in 0..count { - let msg = tube.recv::().unwrap(); - match msg.x { - PRODUCER_ID_1 => id1_count += 1, - PRODUCER_ID_2 => id2_count += 1, - _ => panic!( - "want message with ID {} or {}; got message w/ ID {}.", - PRODUCER_ID_1, PRODUCER_ID_2, msg.x - ), - } - } - (tube, id1_count, id2_count) - } - - #[test] - fn test_serialize_tube_pair() { - let (tube_send, tube_recv) = Tube::pair().unwrap(); - - // Serialize the Tube - let msg_serialize = SerializeDescriptors::new(&tube_send); - let serialized = serde_json::to_vec(&msg_serialize).unwrap(); - let msg_descriptors = msg_serialize.into_descriptors(); - - // Deserialize the Tube - let mut msg_descriptors_safe = msg_descriptors - .into_iter() - .map(|v| Some(unsafe { SafeDescriptor::from_raw_descriptor(v) })) - .collect(); - let tube_deserialized: Tube = deserialize_with_descriptors( - || serde_json::from_slice(&serialized), - &mut msg_descriptors_safe, - ) - .unwrap(); - - // Send a message through deserialized Tube - tube_deserialized.send(&"hi".to_string()).unwrap(); - - // Wait for the message to arrive - let wait_ctx: WaitContext = - WaitContext::build_with(&[(tube_recv.get_read_notifier(), Token::ReceivedData)]) - .unwrap(); - let events = wait_ctx.wait_timeout(Duration::from_secs(10)).unwrap(); - let tokens: Vec = events - .iter() - .filter(|e| e.is_readable) - .map(|e| e.token) - .collect(); - assert_eq!(tokens, vec! {Token::ReceivedData}); - - assert_eq!(tube_recv.recv::().unwrap(), "hi"); - } - - #[test] - fn send_recv_mpsc() { - let (p1, consumer) = Tube::directional_pair().unwrap(); - let p2 = p1.try_clone().unwrap(); - let start_block_p1 = Arc::new(Barrier::new(3)); - let start_block_p2 = start_block_p1.clone(); - let start_block_consumer = start_block_p1.clone(); - - let p1_thread = thread::spawn(move || produce_messages(p1, PRODUCER_ID_1, start_block_p1)); - let p2_thread = thread::spawn(move || produce_messages(p2, PRODUCER_ID_2, start_block_p2)); - - let (_tube, id1_count, id2_count) = consume_messages(consumer, 200, start_block_consumer); - assert_eq!(id1_count, 100); - assert_eq!(id2_count, 100); - - p1_thread.join().unwrap(); - p2_thread.join().unwrap(); - } - - #[test] - fn send_recv_hash_map() { - let (s1, s2) = Tube::pair().unwrap(); - - let mut test_msg = HashMap::new(); - test_msg.insert("Red".to_owned(), Event::new().unwrap()); - test_msg.insert("White".to_owned(), Event::new().unwrap()); - test_msg.insert("Blue".to_owned(), Event::new().unwrap()); - test_msg.insert("Orange".to_owned(), Event::new().unwrap()); - test_msg.insert("Green".to_owned(), Event::new().unwrap()); - s1.send(&test_msg).unwrap(); - let mut recv_msg: HashMap = s2.recv().unwrap(); - - let mut test_msg_keys: Vec<_> = test_msg.keys().collect(); - test_msg_keys.sort(); - let mut recv_msg_keys: Vec<_> = recv_msg.keys().collect(); - recv_msg_keys.sort(); - assert_eq!(test_msg_keys, recv_msg_keys); - - for (key, test_event) in test_msg { - let recv_event = recv_msg.remove(&key).unwrap(); - test_event_pair(test_event, recv_event); - } - } -} diff --git a/base/tests/tube.rs b/base/tests/tube.rs new file mode 100644 index 0000000000..b85a9df9c2 --- /dev/null +++ b/base/tests/tube.rs @@ -0,0 +1,197 @@ +// Copyright 2021 The ChromiumOS Authors +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::Barrier; +use std::thread; +use std::time::Duration; + +use base::RecvTube; +use base::SendTube; +use base::Tube; +use serde::Deserialize; +use serde::Serialize; + +use base::descriptor::FromRawDescriptor; +use base::descriptor::SafeDescriptor; +use base::platform::deserialize_with_descriptors; +use base::platform::SerializeDescriptors; +use base::Event; +use base::EventToken; +use base::ReadNotifier; +use base::WaitContext; + +#[derive(Serialize, Deserialize)] +struct DataStruct { + x: u32, +} + +#[derive(EventToken, Debug, Eq, PartialEq, Copy, Clone)] +enum Token { + ReceivedData, +} + +// Magics to identify which producer sent a message (& detect corruption). +const PRODUCER_ID_1: u32 = 801279273; +const PRODUCER_ID_2: u32 = 345234861; + +#[track_caller] +fn test_event_pair(send: Event, recv: Event) { + send.signal().unwrap(); + recv.wait_timeout(Duration::from_secs(1)).unwrap(); +} + +#[test] +fn send_recv_no_fd() { + let (s1, s2) = Tube::pair().unwrap(); + + let test_msg = "hello world"; + s1.send(&test_msg).unwrap(); + let recv_msg: String = s2.recv().unwrap(); + + assert_eq!(test_msg, recv_msg); +} + +#[test] +fn send_recv_one_fd() { + #[derive(Serialize, Deserialize)] + struct EventStruct { + x: u32, + b: Event, + } + + let (s1, s2) = Tube::pair().unwrap(); + + let test_msg = EventStruct { + x: 100, + b: Event::new().unwrap(), + }; + s1.send(&test_msg).unwrap(); + let recv_msg: EventStruct = s2.recv().unwrap(); + + assert_eq!(test_msg.x, recv_msg.x); + + test_event_pair(test_msg.b, recv_msg.b); +} + +/// Send messages to a Tube with the given identifier (see `consume_messages`; we use this to +/// track different message producers). +#[track_caller] +fn produce_messages(tube: SendTube, data: u32, barrier: Arc) -> SendTube { + let data = DataStruct { x: data }; + barrier.wait(); + for _ in 0..100 { + tube.send(&data).unwrap(); + } + tube +} + +/// Consumes the given number of messages from a Tube, returning the number messages read with +/// each producer ID. +#[track_caller] +fn consume_messages( + tube: RecvTube, + count: usize, + barrier: Arc, +) -> (RecvTube, usize, usize) { + barrier.wait(); + + let mut id1_count = 0usize; + let mut id2_count = 0usize; + + for _ in 0..count { + let msg = tube.recv::().unwrap(); + match msg.x { + PRODUCER_ID_1 => id1_count += 1, + PRODUCER_ID_2 => id2_count += 1, + _ => panic!( + "want message with ID {} or {}; got message w/ ID {}.", + PRODUCER_ID_1, PRODUCER_ID_2, msg.x + ), + } + } + (tube, id1_count, id2_count) +} + +#[test] +fn test_serialize_tube_pair() { + let (tube_send, tube_recv) = Tube::pair().unwrap(); + + // Serialize the Tube + let msg_serialize = SerializeDescriptors::new(&tube_send); + let serialized = serde_json::to_vec(&msg_serialize).unwrap(); + let msg_descriptors = msg_serialize.into_descriptors(); + + // Deserialize the Tube + let mut msg_descriptors_safe = msg_descriptors + .into_iter() + .map(|v| Some(unsafe { SafeDescriptor::from_raw_descriptor(v) })) + .collect(); + let tube_deserialized: Tube = deserialize_with_descriptors( + || serde_json::from_slice(&serialized), + &mut msg_descriptors_safe, + ) + .unwrap(); + + // Send a message through deserialized Tube + tube_deserialized.send(&"hi".to_string()).unwrap(); + + // Wait for the message to arrive + let wait_ctx: WaitContext = + WaitContext::build_with(&[(tube_recv.get_read_notifier(), Token::ReceivedData)]).unwrap(); + let events = wait_ctx.wait_timeout(Duration::from_secs(10)).unwrap(); + let tokens: Vec = events + .iter() + .filter(|e| e.is_readable) + .map(|e| e.token) + .collect(); + assert_eq!(tokens, vec! {Token::ReceivedData}); + + assert_eq!(tube_recv.recv::().unwrap(), "hi"); +} + +#[test] +fn send_recv_mpsc() { + let (p1, consumer) = Tube::directional_pair().unwrap(); + let p2 = p1.try_clone().unwrap(); + let start_block_p1 = Arc::new(Barrier::new(3)); + let start_block_p2 = start_block_p1.clone(); + let start_block_consumer = start_block_p1.clone(); + + let p1_thread = thread::spawn(move || produce_messages(p1, PRODUCER_ID_1, start_block_p1)); + let p2_thread = thread::spawn(move || produce_messages(p2, PRODUCER_ID_2, start_block_p2)); + + let (_tube, id1_count, id2_count) = consume_messages(consumer, 200, start_block_consumer); + assert_eq!(id1_count, 100); + assert_eq!(id2_count, 100); + + p1_thread.join().unwrap(); + p2_thread.join().unwrap(); +} + +#[test] +fn send_recv_hash_map() { + let (s1, s2) = Tube::pair().unwrap(); + + let mut test_msg = HashMap::new(); + test_msg.insert("Red".to_owned(), Event::new().unwrap()); + test_msg.insert("White".to_owned(), Event::new().unwrap()); + test_msg.insert("Blue".to_owned(), Event::new().unwrap()); + test_msg.insert("Orange".to_owned(), Event::new().unwrap()); + test_msg.insert("Green".to_owned(), Event::new().unwrap()); + s1.send(&test_msg).unwrap(); + let mut recv_msg: HashMap = s2.recv().unwrap(); + + let mut test_msg_keys: Vec<_> = test_msg.keys().collect(); + test_msg_keys.sort(); + let mut recv_msg_keys: Vec<_> = recv_msg.keys().collect(); + recv_msg_keys.sort(); + assert_eq!(test_msg_keys, recv_msg_keys); + + for (key, test_event) in test_msg { + let recv_event = recv_msg.remove(&key).unwrap(); + test_event_pair(test_event, recv_event); + } +} diff --git a/base/tests/unix/main.rs b/base/tests/unix/main.rs new file mode 100644 index 0000000000..d3299cc88a --- /dev/null +++ b/base/tests/unix/main.rs @@ -0,0 +1,49 @@ +// Copyright 2022 The ChromiumOS Authors +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#![cfg(unix)] + +use std::path::Path; + +use base::safe_descriptor_from_path; +use base::Error; +use libc::EBADF; +use libc::EINVAL; + +/// Runs all unix specific integration tests in a single binary. +mod net; +mod scoped_signal_handler; +mod syslog; +mod tube; + +#[test] +fn safe_descriptor_from_path_valid() { + assert!(safe_descriptor_from_path(Path::new("/proc/self/fd/2")) + .unwrap() + .is_some()); +} + +#[test] +fn safe_descriptor_from_path_invalid_integer() { + assert_eq!( + safe_descriptor_from_path(Path::new("/proc/self/fd/blah")), + Err(Error::new(EINVAL)) + ); +} + +#[test] +fn safe_descriptor_from_path_invalid_fd() { + assert_eq!( + safe_descriptor_from_path(Path::new("/proc/self/fd/42")), + Err(Error::new(EBADF)) + ); +} + +#[test] +fn safe_descriptor_from_path_none() { + assert_eq!( + safe_descriptor_from_path(Path::new("/something/else")).unwrap(), + None + ); +} diff --git a/base/tests/unix/net.rs b/base/tests/unix/net.rs new file mode 100644 index 0000000000..b466a08fa9 --- /dev/null +++ b/base/tests/unix/net.rs @@ -0,0 +1,237 @@ +// Copyright 2022 The ChromiumOS Authors +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use std::env; +use std::io::ErrorKind; +use std::os::unix::io::AsRawFd; +use std::path::PathBuf; +use std::time::Duration; + +use base::UnixSeqpacket; +use base::UnixSeqpacketListener; +use base::UnlinkUnixSeqpacketListener; + +fn tmpdir() -> PathBuf { + env::temp_dir() +} + +#[test] +fn unix_seqpacket_path_not_exists() { + let res = UnixSeqpacket::connect("/path/not/exists"); + assert!(res.is_err()); +} + +#[test] +fn unix_seqpacket_listener_path() { + let mut socket_path = tmpdir(); + socket_path.push("unix_seqpacket_listener_path"); + let listener = UnlinkUnixSeqpacketListener( + UnixSeqpacketListener::bind(&socket_path).expect("failed to create UnixSeqpacketListener"), + ); + let listener_path = listener.path().expect("failed to get socket listener path"); + assert_eq!(socket_path, listener_path); +} + +#[test] +fn unix_seqpacket_listener_from_fd() { + let mut socket_path = tmpdir(); + socket_path.push("unix_seqpacket_listener_from_fd"); + let listener = UnlinkUnixSeqpacketListener( + UnixSeqpacketListener::bind(&socket_path).expect("failed to create UnixSeqpacketListener"), + ); + // UnixSeqpacketListener should succeed on a valid listening descriptor. + let good_dup = UnixSeqpacketListener::bind(&format!("/proc/self/fd/{}", unsafe { + libc::dup(listener.as_raw_fd()) + })); + let good_dup_path = good_dup + .expect("failed to create dup UnixSeqpacketListener") + .path(); + // Path of socket created by descriptor should be hidden. + assert!(good_dup_path.is_err()); + // UnixSeqpacketListener must fail on an existing non-listener socket. + let s1 = UnixSeqpacket::connect(socket_path.as_path()).expect("UnixSeqpacket::connect failed"); + let bad_dup = UnixSeqpacketListener::bind(&format!("/proc/self/fd/{}", unsafe { + libc::dup(s1.as_raw_fd()) + })); + assert!(bad_dup.is_err()); +} + +#[test] +fn unix_seqpacket_path_exists_pass() { + let mut socket_path = tmpdir(); + socket_path.push("path_to_socket"); + let _listener = UnlinkUnixSeqpacketListener( + UnixSeqpacketListener::bind(&socket_path).expect("failed to create UnixSeqpacketListener"), + ); + let _res = + UnixSeqpacket::connect(socket_path.as_path()).expect("UnixSeqpacket::connect failed"); +} + +#[test] +fn unix_seqpacket_path_listener_accept_with_timeout() { + let mut socket_path = tmpdir(); + socket_path.push("path_listerner_accept_with_timeout"); + let listener = UnlinkUnixSeqpacketListener( + UnixSeqpacketListener::bind(&socket_path).expect("failed to create UnixSeqpacketListener"), + ); + + for d in [Duration::from_millis(10), Duration::ZERO] { + let _ = listener.accept_with_timeout(d).expect_err(&format!( + "UnixSeqpacket::accept_with_timeout {:?} connected", + d + )); + + let s1 = UnixSeqpacket::connect(socket_path.as_path()) + .unwrap_or_else(|_| panic!("UnixSeqpacket::connect {:?} failed", d)); + + let s2 = listener + .accept_with_timeout(d) + .unwrap_or_else(|_| panic!("UnixSeqpacket::accept {:?} failed", d)); + + let data1 = &[0, 1, 2, 3, 4]; + let data2 = &[10, 11, 12, 13, 14]; + s2.send(data2).expect("failed to send data2"); + s1.send(data1).expect("failed to send data1"); + let recv_data = &mut [0; 5]; + s2.recv(recv_data).expect("failed to recv data"); + assert_eq!(data1, recv_data); + s1.recv(recv_data).expect("failed to recv data"); + assert_eq!(data2, recv_data); + } +} + +#[test] +fn unix_seqpacket_path_listener_accept() { + let mut socket_path = tmpdir(); + socket_path.push("path_listerner_accept"); + let listener = UnlinkUnixSeqpacketListener( + UnixSeqpacketListener::bind(&socket_path).expect("failed to create UnixSeqpacketListener"), + ); + let s1 = UnixSeqpacket::connect(socket_path.as_path()).expect("UnixSeqpacket::connect failed"); + + let s2 = listener.accept().expect("UnixSeqpacket::accept failed"); + + let data1 = &[0, 1, 2, 3, 4]; + let data2 = &[10, 11, 12, 13, 14]; + s2.send(data2).expect("failed to send data2"); + s1.send(data1).expect("failed to send data1"); + let recv_data = &mut [0; 5]; + s2.recv(recv_data).expect("failed to recv data"); + assert_eq!(data1, recv_data); + s1.recv(recv_data).expect("failed to recv data"); + assert_eq!(data2, recv_data); +} + +#[test] +fn unix_seqpacket_zero_timeout() { + let (s1, _s2) = UnixSeqpacket::pair().expect("failed to create socket pair"); + // Timeouts less than a microsecond are too small and round to zero. + s1.set_read_timeout(Some(Duration::from_nanos(10))) + .expect_err("successfully set zero timeout"); +} + +#[test] +fn unix_seqpacket_read_timeout() { + let (s1, _s2) = UnixSeqpacket::pair().expect("failed to create socket pair"); + s1.set_read_timeout(Some(Duration::from_millis(1))) + .expect("failed to set read timeout for socket"); + let _ = s1.recv(&mut [0]); +} + +#[test] +fn unix_seqpacket_write_timeout() { + let (s1, _s2) = UnixSeqpacket::pair().expect("failed to create socket pair"); + s1.set_write_timeout(Some(Duration::from_millis(1))) + .expect("failed to set write timeout for socket"); +} + +#[test] +fn unix_seqpacket_send_recv() { + let (s1, s2) = UnixSeqpacket::pair().expect("failed to create socket pair"); + let data1 = &[0, 1, 2, 3, 4]; + let data2 = &[10, 11, 12, 13, 14]; + s2.send(data2).expect("failed to send data2"); + s1.send(data1).expect("failed to send data1"); + let recv_data = &mut [0; 5]; + s2.recv(recv_data).expect("failed to recv data"); + assert_eq!(data1, recv_data); + s1.recv(recv_data).expect("failed to recv data"); + assert_eq!(data2, recv_data); +} + +#[test] +fn unix_seqpacket_send_fragments() { + let (s1, s2) = UnixSeqpacket::pair().expect("failed to create socket pair"); + let data1 = &[0, 1, 2, 3, 4]; + let data2 = &[10, 11, 12, 13, 14, 15, 16]; + s1.send(data1).expect("failed to send data1"); + s1.send(data2).expect("failed to send data2"); + + let recv_data = &mut [0; 32]; + let size = s2.recv(recv_data).expect("failed to recv data"); + assert_eq!(size, data1.len()); + assert_eq!(data1, &recv_data[0..size]); + + let size = s2.recv(recv_data).expect("failed to recv data"); + assert_eq!(size, data2.len()); + assert_eq!(data2, &recv_data[0..size]); +} + +#[test] +fn unix_seqpacket_get_readable_bytes() { + let (s1, s2) = UnixSeqpacket::pair().expect("failed to create socket pair"); + assert_eq!(s1.get_readable_bytes().unwrap(), 0); + assert_eq!(s2.get_readable_bytes().unwrap(), 0); + let data1 = &[0, 1, 2, 3, 4]; + s1.send(data1).expect("failed to send data"); + + assert_eq!(s1.get_readable_bytes().unwrap(), 0); + assert_eq!(s2.get_readable_bytes().unwrap(), data1.len()); + + let recv_data = &mut [0; 5]; + s2.recv(recv_data).expect("failed to recv data"); + assert_eq!(s1.get_readable_bytes().unwrap(), 0); + assert_eq!(s2.get_readable_bytes().unwrap(), 0); +} + +#[test] +fn unix_seqpacket_next_packet_size() { + let (s1, s2) = UnixSeqpacket::pair().expect("failed to create socket pair"); + let data1 = &[0, 1, 2, 3, 4]; + s1.send(data1).expect("failed to send data"); + + assert_eq!(s2.next_packet_size().unwrap(), 5); + s1.set_read_timeout(Some(Duration::from_micros(1))) + .expect("failed to set read timeout"); + assert_eq!( + s1.next_packet_size().unwrap_err().kind(), + ErrorKind::WouldBlock + ); + drop(s2); + assert_eq!( + s1.next_packet_size().unwrap_err().kind(), + ErrorKind::ConnectionReset + ); +} + +#[test] +fn unix_seqpacket_recv_to_vec() { + let (s1, s2) = UnixSeqpacket::pair().expect("failed to create socket pair"); + let data1 = &[0, 1, 2, 3, 4]; + s1.send(data1).expect("failed to send data"); + + let recv_data = &mut vec![]; + s2.recv_to_vec(recv_data).expect("failed to recv data"); + assert_eq!(recv_data, &mut vec![0, 1, 2, 3, 4]); +} + +#[test] +fn unix_seqpacket_recv_as_vec() { + let (s1, s2) = UnixSeqpacket::pair().expect("failed to create socket pair"); + let data1 = &[0, 1, 2, 3, 4]; + s1.send(data1).expect("failed to send data"); + + let recv_data = s2.recv_as_vec().expect("failed to recv data"); + assert_eq!(recv_data, vec![0, 1, 2, 3, 4]); +} diff --git a/base/tests/unix/scoped_signal_handler.rs b/base/tests/unix/scoped_signal_handler.rs new file mode 100644 index 0000000000..29d4edf9cb --- /dev/null +++ b/base/tests/unix/scoped_signal_handler.rs @@ -0,0 +1,254 @@ +// Copyright 2022 The ChromiumOS Authors +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use std::fs::File; +use std::io::BufRead; +use std::io::BufReader; +use std::mem::zeroed; +use std::ptr::null; +use std::ptr::null_mut; +use std::sync::atomic::AtomicI32; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::sync::Mutex; +use std::sync::MutexGuard; +use std::sync::Once; +use std::thread::sleep; +use std::thread::spawn; +use std::time::Duration; +use std::time::Instant; + +use base::platform::scoped_signal_handler::Error; +use base::platform::scoped_signal_handler::Result; +use base::platform::Error as ErrnoError; +use base::sys::ScopedSignalHandler; +use base::sys::Signal; + +use base::sys::clear_signal_handler; +use base::sys::has_default_signal_handler; +use base::sys::wait_for_interrupt; +use base::sys::SignalHandler; +use libc::sigaction; + +use base::platform::gettid; +use base::platform::kill; +use base::platform::Pid; + +const TEST_SIGNAL: Signal = Signal::User1; +const TEST_SIGNALS: &[Signal] = &[Signal::User1, Signal::User2]; + +static TEST_SIGNAL_COUNTER: AtomicUsize = AtomicUsize::new(0); + +/// Only allows one test case to execute at a time. +fn get_mutex() -> MutexGuard<'static, ()> { + static INIT: Once = Once::new(); + static mut VAL: Option>> = None; + + INIT.call_once(|| { + let val = Some(Arc::new(Mutex::new(()))); + // Safe because the mutation is protected by the Once. + unsafe { VAL = val } + }); + + // Safe mutation only happens in the Once. + unsafe { VAL.as_ref() }.unwrap().lock().unwrap() +} + +fn reset_counter() { + TEST_SIGNAL_COUNTER.swap(0, Ordering::SeqCst); +} + +fn get_sigaction(signal: Signal) -> Result { + // Safe because sigaction is owned and expected to be initialized ot zeros. + let mut sigact: sigaction = unsafe { zeroed() }; + + if unsafe { sigaction(signal.into(), null(), &mut sigact) } < 0 { + Err(Error::Sigaction(signal, ErrnoError::last())) + } else { + Ok(sigact) + } +} + +/// Safety: +/// This is only safe if the signal handler set in sigaction is safe. +unsafe fn restore_sigaction(signal: Signal, sigact: sigaction) -> Result { + if sigaction(signal.into(), &sigact, null_mut()) < 0 { + Err(Error::Sigaction(signal, ErrnoError::last())) + } else { + Ok(sigact) + } +} + +/// Safety: +/// Safe if the signal handler for Signal::User1 is safe. +unsafe fn send_test_signal() { + kill(gettid(), Signal::User1.into()).unwrap() +} + +macro_rules! assert_counter_eq { + ($compare_to:expr) => {{ + let expected: usize = $compare_to; + let got: usize = TEST_SIGNAL_COUNTER.load(Ordering::SeqCst); + if got != expected { + panic!( + "wrong signal counter value: got {}; expected {}", + got, expected + ); + } + }}; +} + +struct TestHandler; + +/// # Safety +/// Safe because handle_signal is async-signal safe. +unsafe impl SignalHandler for TestHandler { + fn handle_signal(signal: Signal) { + if TEST_SIGNAL == signal { + TEST_SIGNAL_COUNTER.fetch_add(1, Ordering::SeqCst); + } + } +} + +#[test] +fn scopedsignalhandler_success() { + // Prevent other test cases from running concurrently since the signal + // handlers are shared for the process. + let _guard = get_mutex(); + + reset_counter(); + assert_counter_eq!(0); + + assert!(has_default_signal_handler(TEST_SIGNAL.into()).unwrap()); + let handler = ScopedSignalHandler::new::(&[TEST_SIGNAL]).unwrap(); + assert!(!has_default_signal_handler(TEST_SIGNAL.into()).unwrap()); + + // Safe because test_handler is safe. + unsafe { send_test_signal() }; + + // Give the handler time to run in case it is on a different thread. + for _ in 1..40 { + if TEST_SIGNAL_COUNTER.load(Ordering::SeqCst) > 0 { + break; + } + sleep(Duration::from_millis(250)); + } + + assert_counter_eq!(1); + + drop(handler); + assert!(has_default_signal_handler(TEST_SIGNAL.into()).unwrap()); +} + +#[test] +fn scopedsignalhandler_handleralreadyset() { + // Prevent other test cases from running concurrently since the signal + // handlers are shared for the process. + let _guard = get_mutex(); + + reset_counter(); + assert_counter_eq!(0); + + assert!(has_default_signal_handler(TEST_SIGNAL.into()).unwrap()); + // Safe because TestHandler is async-signal safe. + let handler = ScopedSignalHandler::new::(&[TEST_SIGNAL]).unwrap(); + assert!(!has_default_signal_handler(TEST_SIGNAL.into()).unwrap()); + + // Safe because TestHandler is async-signal safe. + assert!(matches!( + ScopedSignalHandler::new::(TEST_SIGNALS), + Err(Error::HandlerAlreadySet(Signal::User1)) + )); + + assert_counter_eq!(0); + drop(handler); + assert!(has_default_signal_handler(TEST_SIGNAL.into()).unwrap()); +} + +/// Stores the thread used by WaitForInterruptHandler. +static WAIT_FOR_INTERRUPT_THREAD_ID: AtomicI32 = AtomicI32::new(0); +/// Forwards SIGINT to the appropriate thread. +struct WaitForInterruptHandler; + +/// # Safety +/// Safe because handle_signal is async-signal safe. +unsafe impl SignalHandler for WaitForInterruptHandler { + fn handle_signal(_: Signal) { + let tid = WAIT_FOR_INTERRUPT_THREAD_ID.load(Ordering::SeqCst); + // If the thread ID is set and executed on the wrong thread, forward the signal. + if tid != 0 && gettid() != tid { + // Safe because the handler is safe and the target thread id is expecting the signal. + unsafe { kill(tid, Signal::Interrupt.into()) }.unwrap(); + } + } +} + +/// Query /proc/${tid}/status for its State and check if it is either S (sleeping) or in +/// D (disk sleep). +fn thread_is_sleeping(tid: Pid) -> std::result::Result { + const PREFIX: &str = "State:"; + let mut status_reader = BufReader::new(File::open(format!("/proc/{}/status", tid))?); + let mut line = String::new(); + loop { + let count = status_reader.read_line(&mut line)?; + if count == 0 { + return Err(ErrnoError::new(libc::EIO)); + } + if let Some(stripped) = line.strip_prefix(PREFIX) { + return Ok(matches!( + stripped.trim_start().chars().next(), + Some('S') | Some('D') + )); + } + line.clear(); + } +} + +/// Wait for a process to block either in a sleeping or disk sleep state. +fn wait_for_thread_to_sleep(tid: Pid, timeout: Duration) -> std::result::Result<(), ErrnoError> { + let start = Instant::now(); + loop { + if thread_is_sleeping(tid)? { + return Ok(()); + } + if start.elapsed() > timeout { + return Err(ErrnoError::new(libc::EAGAIN)); + } + sleep(Duration::from_millis(50)); + } +} + +#[test] +fn waitforinterrupt_success() { + // Prevent other test cases from running concurrently since the signal + // handlers are shared for the process. + let _guard = get_mutex(); + + let to_restore = get_sigaction(Signal::Interrupt).unwrap(); + clear_signal_handler(Signal::Interrupt.into()).unwrap(); + // Safe because TestHandler is async-signal safe. + let handler = + ScopedSignalHandler::new::(&[Signal::Interrupt]).unwrap(); + + let tid = gettid(); + WAIT_FOR_INTERRUPT_THREAD_ID.store(tid, Ordering::SeqCst); + + let join_handle = spawn(move || -> std::result::Result<(), ErrnoError> { + // Wait unitl the thread is ready to receive the signal. + wait_for_thread_to_sleep(tid, Duration::from_secs(10)).unwrap(); + + // Safe because the SIGINT handler is safe. + unsafe { kill(tid, Signal::Interrupt.into()) } + }); + let wait_ret = wait_for_interrupt(); + let join_ret = join_handle.join(); + + drop(handler); + // Safe because we are restoring the previous SIGINT handler. + unsafe { restore_sigaction(Signal::Interrupt, to_restore) }.unwrap(); + + wait_ret.unwrap(); + join_ret.unwrap().unwrap(); +} diff --git a/base/tests/unix/syslog.rs b/base/tests/unix/syslog.rs new file mode 100644 index 0000000000..52a074ecce --- /dev/null +++ b/base/tests/unix/syslog.rs @@ -0,0 +1,66 @@ +// Copyright 2022 The ChromiumOS Authors +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use std::io::Read; +use std::io::Seek; +use std::io::SeekFrom; +use std::sync::Once; + +use base::syslog::*; + +static EARLY_INIT_ONCE: Once = Once::new(); + +pub fn setup() { + EARLY_INIT_ONCE.call_once(|| { + early_init(); + }); +} + +#[test] +fn fds() { + setup(); + let mut fds = Vec::new(); + push_descriptors(&mut fds); + assert!(!fds.is_empty()); + for fd in fds { + assert!(fd >= 0); + } +} + +#[test] +fn syslog_file() { + setup(); + let mut file = tempfile::tempfile().expect("failed to create tempfile"); + + let syslog_file = file.try_clone().expect("error cloning shared memory file"); + let state = State::new(LogConfig { + pipe: Some(Box::new(syslog_file)), + ..Default::default() + }) + .unwrap(); + + const TEST_STR: &str = "hello shared memory file"; + state.log( + &log::RecordBuilder::new() + .level(Level::Error) + .args(format_args!("{}", TEST_STR)) + .build(), + ); + + file.seek(SeekFrom::Start(0)) + .expect("error seeking shared memory file"); + let mut buf = String::new(); + file.read_to_string(&mut buf) + .expect("error reading shared memory file"); + assert!(buf.contains(TEST_STR)); +} + +#[test] +fn macros() { + setup(); + log::error!("this is an error {}", 3); + log::warn!("this is a warning {}", "uh oh"); + log::info!("this is info {}", true); + log::debug!("this is debug info {:?}", Some("helpful stuff")); +} diff --git a/base/tests/unix/tube.rs b/base/tests/unix/tube.rs new file mode 100644 index 0000000000..263cade14e --- /dev/null +++ b/base/tests/unix/tube.rs @@ -0,0 +1,96 @@ +// Copyright 2022 The ChromiumOS Authors +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use std::time; + +use base::deserialize_with_descriptors; +use base::BlockingMode; +use base::EventContext; +use base::EventToken; +use base::FramingMode; +use base::FromRawDescriptor; +use base::ReadNotifier; +use base::SafeDescriptor; +use base::SerializeDescriptors; +use base::StreamChannel; +use base::Tube; +use base::UnixSeqpacket; + +#[derive(EventToken, Debug, Eq, PartialEq, Copy, Clone)] +enum Token { + ReceivedData, +} + +const EVENT_WAIT_TIME: time::Duration = time::Duration::from_secs(10); + +#[test] +fn test_serialize_tube_new() { + let (sock_send, sock_recv) = + StreamChannel::pair(BlockingMode::Nonblocking, FramingMode::Message).unwrap(); + let tube_send = Tube::new(sock_send).unwrap(); + let tube_recv = Tube::new(sock_recv).unwrap(); + + // Serialize the Tube + let msg_serialize = SerializeDescriptors::new(&tube_send); + let serialized = serde_json::to_vec(&msg_serialize).unwrap(); + let msg_descriptors = msg_serialize.into_descriptors(); + + // Deserialize the Tube + let mut msg_descriptors_safe = msg_descriptors + .into_iter() + .map(|v| Some(unsafe { SafeDescriptor::from_raw_descriptor(v) })) + .collect(); + let tube_deserialized: Tube = deserialize_with_descriptors( + || serde_json::from_slice(&serialized), + &mut msg_descriptors_safe, + ) + .unwrap(); + + // Send a message through deserialized Tube + tube_deserialized.send(&"hi".to_string()).unwrap(); + + // Wait for the message to arrive + let event_ctx: EventContext = + EventContext::build_with(&[(tube_recv.get_read_notifier(), Token::ReceivedData)]).unwrap(); + let events = event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap(); + let tokens: Vec = events + .iter() + .filter(|e| e.is_readable) + .map(|e| e.token) + .collect(); + assert_eq!(tokens, vec! {Token::ReceivedData}); + + assert_eq!(tube_recv.recv::().unwrap(), "hi"); +} + +#[test] +fn test_send_recv_new_from_seqpacket() { + let (sock_send, sock_recv) = UnixSeqpacket::pair().unwrap(); + let tube_send = Tube::new_from_unix_seqpacket(sock_send); + let tube_recv = Tube::new_from_unix_seqpacket(sock_recv); + + tube_send.send(&"hi".to_string()).unwrap(); + + // Wait for the message to arrive + let event_ctx: EventContext = + EventContext::build_with(&[(tube_recv.get_read_notifier(), Token::ReceivedData)]).unwrap(); + let events = event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap(); + let tokens: Vec = events + .iter() + .filter(|e| e.is_readable) + .map(|e| e.token) + .collect(); + assert_eq!(tokens, vec! {Token::ReceivedData}); + + assert_eq!(tube_recv.recv::().unwrap(), "hi"); +} + +#[test] +fn test_tube_new_byte_mode_error() { + let (sock_byte_mode, _) = + StreamChannel::pair(BlockingMode::Nonblocking, FramingMode::Byte).unwrap(); + let tube_error = Tube::new(sock_byte_mode); + + assert!(tube_error.is_err()); +} diff --git a/tools/impl/test_config.py b/tools/impl/test_config.py index 1e79ed7b4a..3d3520e3ce 100755 --- a/tools/impl/test_config.py +++ b/tools/impl/test_config.py @@ -78,7 +78,6 @@ WIN64_DISABLED_CRATES = [ ] CRATE_OPTIONS: Dict[str, List[TestOption]] = { - "base": [TestOption.SINGLE_THREADED, TestOption.LARGE], "cros_async": [TestOption.LARGE, TestOption.RUN_EXCLUSIVE], "crosvm": [TestOption.SINGLE_THREADED, TestOption.UNIT_AS_INTEGRATION_TEST], "crosvm_plugin": [ diff --git a/tools/impl/test_runner.py b/tools/impl/test_runner.py index cf97f58865..b64c85c4db 100644 --- a/tools/impl/test_runner.py +++ b/tools/impl/test_runner.py @@ -15,7 +15,7 @@ from pathlib import Path from typing import Dict, Iterable, List, NamedTuple, Optional from . import test_target, testvm -from .common import all_tracked_files +from .common import all_tracked_files, very_verbose from .test_config import BUILD_FEATURES, CRATE_OPTIONS, TestOption from .test_target import TestTarget, Triple @@ -262,10 +262,11 @@ def build_all_binaries(target: TestTarget, crosvm_direct: bool, instrument_cover cargo_args = [ "--features=" + features, f"--target={target.build_triple}", - "--verbose", "--workspace", *[f"--exclude={crate}" for crate in get_workspace_excludes(target.build_triple)], ] + if very_verbose(): + cargo_args.append("--verbose") cargo_args.extend(extra_args) yield from cargo_build_executables( @@ -291,7 +292,13 @@ def get_test_timeout(target: TestTarget, executable: Executable): return timeout * EMULATION_TIMEOUT_MULTIPLIER -def execute_test(target: TestTarget, attempts: int, collect_coverage: bool, executable: Executable): +def execute_test( + target: TestTarget, + attempts: int, + collect_coverage: bool, + integration_test: bool, + executable: Executable, +): """ Executes a single test on the given test targed @@ -301,7 +308,7 @@ def execute_test(target: TestTarget, attempts: int, collect_coverage: bool, exec """ options = CRATE_OPTIONS.get(executable.crate_name, []) args: List[str] = [] - if TestOption.SINGLE_THREADED in options: + if TestOption.SINGLE_THREADED in options or integration_test: args += ["--test-threads=1"] binary_path = executable.binary_path @@ -400,7 +407,9 @@ def execute_all( sys.stdout.flush() with Pool(PARALLELISM) as pool: for result in pool.imap( - functools.partial(execute_test, unit_test_target, attempts, collect_coverage), + functools.partial( + execute_test, unit_test_target, attempts, collect_coverage, False + ), unit_tests, ): print_test_progress(result) @@ -416,7 +425,9 @@ def execute_all( ) sys.stdout.flush() for executable in integration_tests: - result = execute_test(integration_test_target, attempts, collect_coverage, executable) + result = execute_test( + integration_test_target, attempts, collect_coverage, True, executable + ) print_test_progress(result) yield result print()