salsa/tests/parallel/parallel_cycle_mid_recover.rs

103 lines
2.6 KiB
Rust
Raw Normal View History

2022-08-09 10:06:39 +00:00
//! Test for cycle recover spread across two threads.
//! See `../cycles.rs` for a complete listing of cycle tests,
//! both intra and cross thread.
use crate::setup::{Knobs, KnobsDatabase};
2022-08-09 10:06:39 +00:00
2024-07-16 10:04:01 +00:00
#[salsa::input]
2022-08-09 10:06:39 +00:00
pub(crate) struct MyInput {
2022-08-09 10:33:46 +00:00
field: i32,
2022-08-09 10:06:39 +00:00
}
2024-07-16 10:04:01 +00:00
#[salsa::tracked]
pub(crate) fn a1(db: &dyn KnobsDatabase, input: MyInput) -> i32 {
2022-08-09 10:06:39 +00:00
// tell thread b we have started
db.signal(1);
// wait for thread b to block on a1
db.wait_for(2);
a2(db, input)
}
2024-07-17 13:18:43 +00:00
2024-07-16 10:04:01 +00:00
#[salsa::tracked]
pub(crate) fn a2(db: &dyn KnobsDatabase, input: MyInput) -> i32 {
2022-08-09 10:06:39 +00:00
// create the cycle
b1(db, input)
}
2024-07-17 13:18:43 +00:00
#[salsa::tracked(recovery_fn=recover_b1)]
pub(crate) fn b1(db: &dyn KnobsDatabase, input: MyInput) -> i32 {
2022-08-09 10:06:39 +00:00
// wait for thread a to have started
db.wait_for(1);
b2(db, input)
}
fn recover_b1(db: &dyn KnobsDatabase, _cycle: &salsa::Cycle, key: MyInput) -> i32 {
2022-08-09 10:06:39 +00:00
dbg!("recover_b1");
key.field(db) * 20 + 2
}
2024-07-16 10:04:01 +00:00
#[salsa::tracked]
pub(crate) fn b2(db: &dyn KnobsDatabase, input: MyInput) -> i32 {
2022-08-09 10:06:39 +00:00
// will encounter a cycle but recover
b3(db, input);
b1(db, input); // hasn't recovered yet
0
}
2024-07-17 13:18:43 +00:00
#[salsa::tracked(recovery_fn=recover_b3)]
pub(crate) fn b3(db: &dyn KnobsDatabase, input: MyInput) -> i32 {
2022-08-09 10:06:39 +00:00
// will block on thread a, signaling stage 2
a1(db, input)
}
fn recover_b3(db: &dyn KnobsDatabase, _cycle: &salsa::Cycle, key: MyInput) -> i32 {
2022-08-09 10:06:39 +00:00
dbg!("recover_b3");
key.field(db) * 200 + 2
}
2022-08-09 10:33:46 +00:00
// Recover cycle test:
//
// The pattern is as follows.
//
// Thread A Thread B
// -------- --------
// a1 b1
// | wait for stage 1 (blocks)
// signal stage 1 |
// wait for stage 2 (blocks) (unblocked)
// | |
// | b2
// | b3
// | a1 (blocks -> stage 2)
// (unblocked) |
// a2 (cycle detected) |
// b3 recovers
// b2 resumes
// b1 recovers
#[test]
fn execute() {
let db = Knobs::default();
2022-08-09 10:33:46 +00:00
let input = MyInput::new(&db, 1);
2022-08-09 10:33:46 +00:00
let thread_a = std::thread::spawn({
2024-07-24 09:53:24 +00:00
let db = db.clone();
move || a1(&db, input)
2022-08-09 10:33:46 +00:00
});
let thread_b = std::thread::spawn({
2024-07-24 09:53:24 +00:00
let db = db.clone();
db.knobs().signal_on_will_block.store(3);
move || b1(&db, input)
2022-08-09 10:33:46 +00:00
});
// We expect that the recovery function yields
// `1 * 20 + 2`, which is returned (and forwarded)
// to b1, and from there to a2 and a1.
assert_eq!(thread_a.join().unwrap(), 22);
assert_eq!(thread_b.join().unwrap(), 22);
}