mirror of
https://github.com/salsa-rs/salsa.git
synced 2024-10-23 04:46:35 +00:00
add a test for handle cancellation
I realized there weren't any!
This commit is contained in:
parent
a5395665ce
commit
8a3cc6e404
6 changed files with 101 additions and 8 deletions
|
@ -53,6 +53,11 @@ pub enum EventKind {
|
|||
/// the current revision has been cancelled.
|
||||
WillCheckCancellation,
|
||||
|
||||
/// Indicates that one [`Handle`](`crate::Handle`) has set the cancellation flag.
|
||||
/// When other active handles execute salsa methods, they will observe this flag
|
||||
/// and panic with a sentinel value of type [`Cancelled`](`crate::Cancelled`).
|
||||
DidSetCancellationFlag,
|
||||
|
||||
/// Discovered that a query used to output a given output but no longer does.
|
||||
WillDiscardStaleOutput {
|
||||
/// Key for the query that is executing and which no longer outputs the given value.
|
||||
|
|
|
@ -2,7 +2,7 @@ use std::sync::Arc;
|
|||
|
||||
use parking_lot::{Condvar, Mutex};
|
||||
|
||||
use crate::storage::HasStorage;
|
||||
use crate::{storage::HasStorage, Event, EventKind};
|
||||
|
||||
/// A database "handle" allows coordination of multiple async tasks accessing the same database.
|
||||
/// So long as you are just doing reads, you can freely clone.
|
||||
|
@ -49,6 +49,12 @@ impl<Db: HasStorage> Handle<Db> {
|
|||
let storage = self.db.storage();
|
||||
storage.runtime().set_cancellation_flag();
|
||||
|
||||
self.db.salsa_event(Event {
|
||||
thread_id: std::thread::current().id(),
|
||||
|
||||
kind: EventKind::DidSetCancellationFlag,
|
||||
});
|
||||
|
||||
let mut clones = self.coordinate.clones.lock();
|
||||
while *clones != 1 {
|
||||
self.coordinate.cvar.wait(&mut clones);
|
||||
|
|
|
@ -325,10 +325,6 @@ impl LocalState {
|
|||
kind: EventKind::WillCheckCancellation,
|
||||
});
|
||||
if runtime.load_cancellation_flag() {
|
||||
db.salsa_event(Event {
|
||||
thread_id,
|
||||
kind: EventKind::WillCheckCancellation,
|
||||
});
|
||||
self.unwind_cancelled(runtime);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
mod setup;
|
||||
|
||||
mod parallel_cancellation;
|
||||
mod parallel_cycle_all_recover;
|
||||
mod parallel_cycle_mid_recover;
|
||||
mod parallel_cycle_none_recover;
|
||||
|
|
76
tests/parallel/parallel_cancellation.rs
Normal file
76
tests/parallel/parallel_cancellation.rs
Normal file
|
@ -0,0 +1,76 @@
|
|||
//! Test for cycle recover spread across two threads.
|
||||
//! See `../cycles.rs` for a complete listing of cycle tests,
|
||||
//! both intra and cross thread.
|
||||
|
||||
use salsa::Cancelled;
|
||||
use salsa::Handle;
|
||||
use salsa::Setter;
|
||||
|
||||
use crate::setup::Database;
|
||||
use crate::setup::Knobs;
|
||||
|
||||
#[salsa::db]
|
||||
pub(crate) trait Db: salsa::Database + Knobs {}
|
||||
|
||||
#[salsa::db]
|
||||
impl<T: salsa::Database + Knobs> Db for T {}
|
||||
|
||||
#[salsa::input]
|
||||
struct MyInput {
|
||||
field: i32,
|
||||
}
|
||||
|
||||
#[salsa::tracked]
|
||||
fn a1(db: &dyn Db, input: MyInput) -> MyInput {
|
||||
db.signal(1);
|
||||
db.wait_for(2);
|
||||
dummy(db, input)
|
||||
}
|
||||
|
||||
#[salsa::tracked]
|
||||
fn dummy(_db: &dyn Db, _input: MyInput) -> MyInput {
|
||||
panic!("should never get here!")
|
||||
}
|
||||
|
||||
// Cancellation signalling test
|
||||
//
|
||||
// The pattern is as follows.
|
||||
//
|
||||
// Thread A Thread B
|
||||
// -------- --------
|
||||
// a1
|
||||
// | wait for stage 1
|
||||
// signal stage 1 set input, triggers cancellation
|
||||
// wait for stage 2 (blocks) triggering cancellation sends stage 2
|
||||
// |
|
||||
// (unblocked)
|
||||
// dummy
|
||||
// panics
|
||||
|
||||
#[test]
|
||||
fn execute() {
|
||||
let mut db = Handle::new(Database::default());
|
||||
db.knobs().signal_on_will_block.store(3);
|
||||
|
||||
let input = MyInput::new(&*db, 1);
|
||||
|
||||
let thread_a = std::thread::spawn({
|
||||
let db = db.clone();
|
||||
move || a1(&*db, input)
|
||||
});
|
||||
|
||||
input.set_field(db.get_mut()).to(2);
|
||||
|
||||
// Assert thread A *should* was cancelled
|
||||
let cancelled = thread_a
|
||||
.join()
|
||||
.unwrap_err()
|
||||
.downcast::<Cancelled>()
|
||||
.unwrap();
|
||||
|
||||
// and inspect the output
|
||||
expect_test::expect![[r#"
|
||||
PendingWrite
|
||||
"#]]
|
||||
.assert_debug_eq(&cancelled);
|
||||
}
|
|
@ -21,8 +21,11 @@ pub(crate) struct KnobsStruct {
|
|||
/// threads to ensure we reach various weird states.
|
||||
pub(crate) signal: Signal,
|
||||
|
||||
/// When this database is about to block, send a signal.
|
||||
/// When this database is about to block, send this signal.
|
||||
pub(crate) signal_on_will_block: AtomicCell<usize>,
|
||||
|
||||
/// When this database has set the cancellation flag, send this signal.
|
||||
pub(crate) signal_on_did_cancel: AtomicCell<usize>,
|
||||
}
|
||||
|
||||
#[salsa::db]
|
||||
|
@ -35,8 +38,14 @@ pub(crate) struct Database {
|
|||
#[salsa::db]
|
||||
impl salsa::Database for Database {
|
||||
fn salsa_event(&self, event: salsa::Event) {
|
||||
if let salsa::EventKind::WillBlockOn { .. } = event.kind {
|
||||
self.signal(self.knobs().signal_on_will_block.load());
|
||||
match event.kind {
|
||||
salsa::EventKind::WillBlockOn { .. } => {
|
||||
self.signal(self.knobs().signal_on_will_block.load());
|
||||
}
|
||||
salsa::EventKind::DidSetCancellationFlag => {
|
||||
self.signal(self.knobs().signal_on_did_cancel.load());
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue