From bfa74bc8653fa573b87e3af4be0bf0ed63db119b Mon Sep 17 00:00:00 2001 From: Niko Matsakis Date: Thu, 11 Nov 2021 04:59:43 -0500 Subject: [PATCH] spread parallel tests into their own files I was finding the parallel test setup hard to read, so everything relating to one test is now in a single file, with shorter names. --- tests/parallel/cycles.rs | 169 ------------------ tests/parallel/main.rs | 3 +- tests/parallel/parallel_cycle_all_recover.rs | 110 ++++++++++++ tests/parallel/parallel_cycle_none_recover.rs | 71 ++++++++ tests/parallel/setup.rs | 27 +-- 5 files changed, 188 insertions(+), 192 deletions(-) delete mode 100644 tests/parallel/cycles.rs create mode 100644 tests/parallel/parallel_cycle_all_recover.rs create mode 100644 tests/parallel/parallel_cycle_none_recover.rs diff --git a/tests/parallel/cycles.rs b/tests/parallel/cycles.rs deleted file mode 100644 index 6170fdb..0000000 --- a/tests/parallel/cycles.rs +++ /dev/null @@ -1,169 +0,0 @@ -//! Tests for cycles that occur across threads. See the -//! `../cycles.rs` for a complete listing of cycle tests, -//! both intra and cross thread. - -use crate::setup::{Knobs, ParDatabase, ParDatabaseImpl}; -use salsa::ParallelDatabase; -use test_env_log::test; - -// Recover cycle test: -// -// The pattern is as follows. -// -// Thread A Thread B -// -------- -------- -// a1 b1 -// | wait for stage 1 (blocks) -// signal stage 1 | -// wait for stage 2 (blocks) (unblocked) -// | signal stage 2 -// (unblocked) wait for stage 3 (blocks) -// a2 | -// b1 (blocks -> stage 3) | -// | (unblocked) -// | b2 -// | a1 (cycle detected, recovers) -// | b2 completes, recovers -// | b1 completes, recovers -// a2 sees cycle, recovers -// a1 completes, recovers - -pub(crate) fn recover_from_cycle_a1( - _db: &dyn ParDatabase, - _cycle: &salsa::Cycle, - key: &i32, -) -> i32 { - log::debug!("recover_from_cycle_a1"); - key * 10 + 1 -} - -pub(crate) fn recover_from_cycle_a2( - _db: &dyn ParDatabase, - _cycle: &salsa::Cycle, - key: &i32, -) -> i32 { - log::debug!("recover_from_cycle_a2"); - key * 10 + 2 -} - -pub(crate) fn recover_from_cycle_b1( - _db: &dyn ParDatabase, - _cycle: &salsa::Cycle, - key: &i32, -) -> i32 { - log::debug!("recover_from_cycle_b1"); - key * 20 + 1 -} - -pub(crate) fn recover_from_cycle_b2( - _db: &dyn ParDatabase, - _cycle: &salsa::Cycle, - key: &i32, -) -> i32 { - log::debug!("recover_from_cycle_b2"); - key * 20 + 2 -} - -pub(crate) fn recover_cycle_a1(db: &dyn ParDatabase, key: i32) -> i32 { - // Wait to create the cycle until both threads have entered - db.signal(1); - db.wait_for(2); - - db.recover_cycle_a2(key) -} - -pub(crate) fn recover_cycle_a2(db: &dyn ParDatabase, key: i32) -> i32 { - db.recover_cycle_b1(key) -} - -pub(crate) fn recover_cycle_b1(db: &dyn ParDatabase, key: i32) -> i32 { - // Wait to create the cycle until both threads have entered - db.wait_for(1); - db.signal(2); - - // Wait for thread A to block on this thread - db.wait_for(3); - - db.recover_cycle_b2(key) -} - -pub(crate) fn recover_cycle_b2(db: &dyn ParDatabase, key: i32) -> i32 { - db.recover_cycle_a1(key) -} - -pub(crate) fn panic_cycle_a(db: &dyn ParDatabase, key: i32) -> i32 { - // Wait to create the cycle until both threads have entered - db.signal(1); - db.wait_for(2); - - db.panic_cycle_b(key) -} - -pub(crate) fn panic_cycle_b(db: &dyn ParDatabase, key: i32) -> i32 { - // Wait to create the cycle until both threads have entered - db.wait_for(1); - db.signal(2); - - // Wait for thread A to block on this thread - db.wait_for(3); - - // Now try to execute A - db.panic_cycle_a(key) -} - -#[test] -fn recover_parallel_cycle() { - let db = ParDatabaseImpl::default(); - db.knobs().signal_on_will_block.set(3); - - let thread_a = std::thread::spawn({ - let db = db.snapshot(); - move || db.recover_cycle_a1(1) - }); - - let thread_b = std::thread::spawn({ - let db = db.snapshot(); - move || db.recover_cycle_b1(1) - }); - - assert_eq!(thread_a.join().unwrap(), 11); - assert_eq!(thread_b.join().unwrap(), 21); -} - -#[test] -fn panic_parallel_cycle() { - let db = ParDatabaseImpl::default(); - db.knobs().signal_on_will_block.set(3); - - let thread_a = std::thread::spawn({ - let db = db.snapshot(); - move || db.panic_cycle_a(-1) - }); - - let thread_b = std::thread::spawn({ - let db = db.snapshot(); - move || db.panic_cycle_b(-1) - }); - - // We expect B to panic because it detects a cycle (it is the one that calls A, ultimately). - // Right now, it panics with a string. - let err_b = thread_b.join().unwrap_err(); - if let Some(c) = err_b.downcast_ref::() { - insta::assert_debug_snapshot!(c.unexpected_participants(&db), @r###" - [ - "panic_cycle_a(-1)", - "panic_cycle_b(-1)", - ] - "###); - } else { - panic!("b failed in an unexpected way: {:?}", err_b); - } - - // We expect A to propagate a panic, which causes us to use the sentinel - // type `Canceled`. - assert!(thread_a - .join() - .unwrap_err() - .downcast_ref::() - .is_some()); -} diff --git a/tests/parallel/main.rs b/tests/parallel/main.rs index 34cd5d8..cac34b2 100644 --- a/tests/parallel/main.rs +++ b/tests/parallel/main.rs @@ -1,9 +1,10 @@ mod setup; mod cancellation; -mod cycles; mod frozen; mod independent; +mod parallel_cycle_all_recover; +mod parallel_cycle_none_recover; mod race; mod signal; mod stress; diff --git a/tests/parallel/parallel_cycle_all_recover.rs b/tests/parallel/parallel_cycle_all_recover.rs new file mode 100644 index 0000000..f3e5609 --- /dev/null +++ b/tests/parallel/parallel_cycle_all_recover.rs @@ -0,0 +1,110 @@ +//! Test for cycle recover spread across two threads. +//! See `../cycles.rs` for a complete listing of cycle tests, +//! both intra and cross thread. + +use crate::setup::{Knobs, ParDatabaseImpl}; +use salsa::ParallelDatabase; +use test_env_log::test; + +// Recover cycle test: +// +// The pattern is as follows. +// +// Thread A Thread B +// -------- -------- +// a1 b1 +// | wait for stage 1 (blocks) +// signal stage 1 | +// wait for stage 2 (blocks) (unblocked) +// | signal stage 2 +// (unblocked) wait for stage 3 (blocks) +// a2 | +// b1 (blocks -> stage 3) | +// | (unblocked) +// | b2 +// | a1 (cycle detected, recovers) +// | b2 completes, recovers +// | b1 completes, recovers +// a2 sees cycle, recovers +// a1 completes, recovers + +#[test] +fn parallel_cycle_all_recover() { + let db = ParDatabaseImpl::default(); + db.knobs().signal_on_will_block.set(3); + + let thread_a = std::thread::spawn({ + let db = db.snapshot(); + move || db.a1(1) + }); + + let thread_b = std::thread::spawn({ + let db = db.snapshot(); + move || db.b1(1) + }); + + assert_eq!(thread_a.join().unwrap(), 11); + assert_eq!(thread_b.join().unwrap(), 21); +} + +#[salsa::query_group(ParallelCycleAllRecover)] +pub(crate) trait TestDatabase: Knobs { + #[salsa::cycle(recover_a1)] + fn a1(&self, key: i32) -> i32; + + #[salsa::cycle(recover_a2)] + fn a2(&self, key: i32) -> i32; + + #[salsa::cycle(recover_b1)] + fn b1(&self, key: i32) -> i32; + + #[salsa::cycle(recover_b2)] + fn b2(&self, key: i32) -> i32; +} + +fn recover_a1(_db: &dyn TestDatabase, _cycle: &salsa::Cycle, key: &i32) -> i32 { + log::debug!("recover_a1"); + key * 10 + 1 +} + +fn recover_a2(_db: &dyn TestDatabase, _cycle: &salsa::Cycle, key: &i32) -> i32 { + log::debug!("recover_a2"); + key * 10 + 2 +} + +fn recover_b1(_db: &dyn TestDatabase, _cycle: &salsa::Cycle, key: &i32) -> i32 { + log::debug!("recover_b1"); + key * 20 + 1 +} + +fn recover_b2(_db: &dyn TestDatabase, _cycle: &salsa::Cycle, key: &i32) -> i32 { + log::debug!("recover_b2"); + key * 20 + 2 +} + +fn a1(db: &dyn TestDatabase, key: i32) -> i32 { + // Wait to create the cycle until both threads have entered + db.signal(1); + db.wait_for(2); + + db.a2(key) +} + +fn a2(db: &dyn TestDatabase, key: i32) -> i32 { + db.b1(key) +} + +fn b1(db: &dyn TestDatabase, key: i32) -> i32 { + // Wait to create the cycle until both threads have entered + db.wait_for(1); + db.signal(2); + + // Wait for thread A to block on this thread + db.wait_for(3); + + db.b2(key) +} + +fn b2(db: &dyn TestDatabase, key: i32) -> i32 { + db.a1(key) +} diff --git a/tests/parallel/parallel_cycle_none_recover.rs b/tests/parallel/parallel_cycle_none_recover.rs new file mode 100644 index 0000000..9cb8f2f --- /dev/null +++ b/tests/parallel/parallel_cycle_none_recover.rs @@ -0,0 +1,71 @@ +//! Test a cycle where no queries recover that occurs across threads. +//! See the `../cycles.rs` for a complete listing of cycle tests, +//! both intra and cross thread. + +use crate::setup::{Knobs, ParDatabaseImpl}; +use salsa::ParallelDatabase; +use test_env_log::test; + +#[test] +fn parallel_cycle_none_recover() { + let db = ParDatabaseImpl::default(); + db.knobs().signal_on_will_block.set(3); + + let thread_a = std::thread::spawn({ + let db = db.snapshot(); + move || db.a(-1) + }); + + let thread_b = std::thread::spawn({ + let db = db.snapshot(); + move || db.b(-1) + }); + + // We expect B to panic because it detects a cycle (it is the one that calls A, ultimately). + // Right now, it panics with a string. + let err_b = thread_b.join().unwrap_err(); + if let Some(c) = err_b.downcast_ref::() { + insta::assert_debug_snapshot!(c.unexpected_participants(&db), @r###" + [ + "a(-1)", + "b(-1)", + ] + "###); + } else { + panic!("b failed in an unexpected way: {:?}", err_b); + } + + // We expect A to propagate a panic, which causes us to use the sentinel + // type `Canceled`. + assert!(thread_a + .join() + .unwrap_err() + .downcast_ref::() + .is_some()); +} + +#[salsa::query_group(ParallelCycleNoneRecover)] +pub(crate) trait TestDatabase: Knobs { + fn a(&self, key: i32) -> i32; + fn b(&self, key: i32) -> i32; +} + +fn a(db: &dyn TestDatabase, key: i32) -> i32 { + // Wait to create the cycle until both threads have entered + db.signal(1); + db.wait_for(2); + + db.b(key) +} + +fn b(db: &dyn TestDatabase, key: i32) -> i32 { + // Wait to create the cycle until both threads have entered + db.wait_for(1); + db.signal(2); + + // Wait for thread A to block on this thread + db.wait_for(3); + + // Now try to execute A + db.a(key) +} diff --git a/tests/parallel/setup.rs b/tests/parallel/setup.rs index 9691c3c..aa587c3 100644 --- a/tests/parallel/setup.rs +++ b/tests/parallel/setup.rs @@ -26,27 +26,6 @@ pub(crate) trait ParDatabase: Knobs { /// Invokes `sum2_drop_sum` fn sum3_drop_sum(&self, key: &'static str) -> usize; - - #[salsa::cycle(crate::cycles::recover_from_cycle_a1)] - #[salsa::invoke(crate::cycles::recover_cycle_a1)] - fn recover_cycle_a1(&self, key: i32) -> i32; - - #[salsa::cycle(crate::cycles::recover_from_cycle_a2)] - #[salsa::invoke(crate::cycles::recover_cycle_a2)] - fn recover_cycle_a2(&self, key: i32) -> i32; - - #[salsa::cycle(crate::cycles::recover_from_cycle_b1)] - #[salsa::invoke(crate::cycles::recover_cycle_b1)] - fn recover_cycle_b1(&self, key: i32) -> i32; - - #[salsa::cycle(crate::cycles::recover_from_cycle_b2)] - #[salsa::invoke(crate::cycles::recover_cycle_b2)] - fn recover_cycle_b2(&self, key: i32) -> i32; - - #[salsa::invoke(crate::cycles::panic_cycle_a)] - fn panic_cycle_a(&self, key: i32) -> i32; - #[salsa::invoke(crate::cycles::panic_cycle_b)] - fn panic_cycle_b(&self, key: i32) -> i32; } /// Various "knobs" and utilities used by tests to force @@ -178,7 +157,11 @@ fn sum3_drop_sum(db: &dyn ParDatabase, key: &'static str) -> usize { db.sum2_drop_sum(key) } -#[salsa::database(Par)] +#[salsa::database( + Par, + crate::parallel_cycle_all_recover::ParallelCycleAllRecover, + crate::parallel_cycle_none_recover::ParallelCycleNoneRecover +)] #[derive(Default)] pub(crate) struct ParDatabaseImpl { storage: salsa::Storage,