mirror of
https://github.com/salsa-rs/salsa.git
synced 2024-10-23 20:59:51 +00:00
preserve both cancellation strategies
This commit is contained in:
parent
f07643d232
commit
61e1d69fb5
3 changed files with 110 additions and 109 deletions
|
@ -230,16 +230,6 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
/// Calls the callback if the current revision is cancelled.
|
||||
///
|
||||
/// Observing cancellation may lead to inconsistencies in database storage,
|
||||
/// so the callback must panic.
|
||||
pub fn if_current_revision_is_canceled(&self, cb: fn() -> !) {
|
||||
if self.pending_revision() > self.current_revision() {
|
||||
cb()
|
||||
}
|
||||
}
|
||||
|
||||
/// Acquires the **global query write lock** (ensuring that no
|
||||
/// queries are executing) and then increments the current
|
||||
/// revision counter; invokes `op` with the global query write
|
||||
|
|
|
@ -4,133 +4,150 @@ use crate::setup::{
|
|||
use salsa::{Database, ParallelDatabase};
|
||||
|
||||
macro_rules! assert_canceled {
|
||||
($thread:expr) => {
|
||||
match $thread.join() {
|
||||
Ok(value) => panic!("expected cancelation, got {:?}", value),
|
||||
Err(payload) => match payload.downcast::<Canceled>() {
|
||||
Ok(_) => {}
|
||||
Err(payload) => ::std::panic::resume_unwind(payload),
|
||||
},
|
||||
($flag:expr, $thread:expr) => {
|
||||
if $flag == CancelationFlag::Panic {
|
||||
match $thread.join() {
|
||||
Ok(value) => panic!("expected cancelation, got {:?}", value),
|
||||
Err(payload) => match payload.downcast::<Canceled>() {
|
||||
Ok(_) => {}
|
||||
Err(payload) => ::std::panic::resume_unwind(payload),
|
||||
},
|
||||
}
|
||||
} else {
|
||||
assert_eq!($thread.join().unwrap(), usize::max_value());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// We have to falvors of cancellation: based on unwindig and based on anon
|
||||
/// reads. This checks both,
|
||||
fn check_cancelation(f: impl Fn(CancelationFlag)) {
|
||||
f(CancelationFlag::Panic);
|
||||
f(CancelationFlag::SpecialValue);
|
||||
}
|
||||
|
||||
/// Add test where a call to `sum` is cancelled by a simultaneous
|
||||
/// write. Check that we recompute the result in next revision, even
|
||||
/// though none of the inputs have changed.
|
||||
#[test]
|
||||
fn in_par_get_set_cancellation_immediate() {
|
||||
let mut db = ParDatabaseImpl::default();
|
||||
check_cancelation(|flag| {
|
||||
let mut db = ParDatabaseImpl::default();
|
||||
|
||||
db.query_mut(Input).set('a', 100);
|
||||
db.query_mut(Input).set('b', 010);
|
||||
db.query_mut(Input).set('c', 001);
|
||||
db.query_mut(Input).set('d', 0);
|
||||
db.query_mut(Input).set('a', 100);
|
||||
db.query_mut(Input).set('b', 010);
|
||||
db.query_mut(Input).set('c', 001);
|
||||
db.query_mut(Input).set('d', 0);
|
||||
|
||||
let thread1 = std::thread::spawn({
|
||||
let db = db.snapshot();
|
||||
move || {
|
||||
// This will not return until it sees cancellation is
|
||||
// signaled.
|
||||
db.knobs().sum_signal_on_entry.with_value(1, || {
|
||||
db.knobs()
|
||||
.sum_wait_for_cancellation
|
||||
.with_value(CancelationFlag::Panic, || db.sum("abc"))
|
||||
})
|
||||
}
|
||||
});
|
||||
let thread1 = std::thread::spawn({
|
||||
let db = db.snapshot();
|
||||
move || {
|
||||
// This will not return until it sees cancellation is
|
||||
// signaled.
|
||||
db.knobs().sum_signal_on_entry.with_value(1, || {
|
||||
db.knobs()
|
||||
.sum_wait_for_cancellation
|
||||
.with_value(flag, || db.sum("abc"))
|
||||
})
|
||||
}
|
||||
});
|
||||
|
||||
// Wait until we have entered `sum` in the other thread.
|
||||
db.wait_for(1);
|
||||
// Wait until we have entered `sum` in the other thread.
|
||||
db.wait_for(1);
|
||||
|
||||
// Try to set the input. This will signal cancellation.
|
||||
db.query_mut(Input).set('d', 1000);
|
||||
// Try to set the input. This will signal cancellation.
|
||||
db.query_mut(Input).set('d', 1000);
|
||||
|
||||
// This should re-compute the value (even though no input has changed).
|
||||
let thread2 = std::thread::spawn({
|
||||
let db = db.snapshot();
|
||||
move || db.sum("abc")
|
||||
});
|
||||
// This should re-compute the value (even though no input has changed).
|
||||
let thread2 = std::thread::spawn({
|
||||
let db = db.snapshot();
|
||||
move || db.sum("abc")
|
||||
});
|
||||
|
||||
assert_eq!(db.sum("d"), 1000);
|
||||
assert_canceled!(thread1);
|
||||
assert_eq!(thread2.join().unwrap(), 111);
|
||||
assert_eq!(db.sum("d"), 1000);
|
||||
assert_canceled!(flag, thread1);
|
||||
assert_eq!(thread2.join().unwrap(), 111);
|
||||
})
|
||||
}
|
||||
|
||||
/// Here, we check that `sum`'s cancellation is propagated
|
||||
/// to `sum2` properly.
|
||||
#[test]
|
||||
fn in_par_get_set_cancellation_transitive() {
|
||||
let mut db = ParDatabaseImpl::default();
|
||||
check_cancelation(|flag| {
|
||||
let mut db = ParDatabaseImpl::default();
|
||||
|
||||
db.query_mut(Input).set('a', 100);
|
||||
db.query_mut(Input).set('b', 010);
|
||||
db.query_mut(Input).set('c', 001);
|
||||
db.query_mut(Input).set('d', 0);
|
||||
db.query_mut(Input).set('a', 100);
|
||||
db.query_mut(Input).set('b', 010);
|
||||
db.query_mut(Input).set('c', 001);
|
||||
db.query_mut(Input).set('d', 0);
|
||||
|
||||
let thread1 = std::thread::spawn({
|
||||
let db = db.snapshot();
|
||||
move || {
|
||||
// This will not return until it sees cancellation is
|
||||
// signaled.
|
||||
db.knobs().sum_signal_on_entry.with_value(1, || {
|
||||
db.knobs()
|
||||
.sum_wait_for_cancellation
|
||||
.with_value(CancelationFlag::Panic, || db.sum2("abc"))
|
||||
})
|
||||
}
|
||||
});
|
||||
let thread1 = std::thread::spawn({
|
||||
let db = db.snapshot();
|
||||
move || {
|
||||
// This will not return until it sees cancellation is
|
||||
// signaled.
|
||||
db.knobs().sum_signal_on_entry.with_value(1, || {
|
||||
db.knobs()
|
||||
.sum_wait_for_cancellation
|
||||
.with_value(flag, || db.sum2("abc"))
|
||||
})
|
||||
}
|
||||
});
|
||||
|
||||
// Wait until we have entered `sum` in the other thread.
|
||||
db.wait_for(1);
|
||||
// Wait until we have entered `sum` in the other thread.
|
||||
db.wait_for(1);
|
||||
|
||||
// Try to set the input. This will signal cancellation.
|
||||
db.query_mut(Input).set('d', 1000);
|
||||
// Try to set the input. This will signal cancellation.
|
||||
db.query_mut(Input).set('d', 1000);
|
||||
|
||||
// This should re-compute the value (even though no input has changed).
|
||||
let thread2 = std::thread::spawn({
|
||||
let db = db.snapshot();
|
||||
move || db.sum2("abc")
|
||||
});
|
||||
// This should re-compute the value (even though no input has changed).
|
||||
let thread2 = std::thread::spawn({
|
||||
let db = db.snapshot();
|
||||
move || db.sum2("abc")
|
||||
});
|
||||
|
||||
assert_eq!(db.sum2("d"), 1000);
|
||||
assert_canceled!(thread1);
|
||||
assert_eq!(thread2.join().unwrap(), 111);
|
||||
assert_eq!(db.sum2("d"), 1000);
|
||||
assert_canceled!(flag, thread1);
|
||||
assert_eq!(thread2.join().unwrap(), 111);
|
||||
})
|
||||
}
|
||||
|
||||
/// https://github.com/salsa-rs/salsa/issues/66
|
||||
#[test]
|
||||
fn no_back_dating_in_cancellation() {
|
||||
let mut db = ParDatabaseImpl::default();
|
||||
check_cancelation(|flag| {
|
||||
let mut db = ParDatabaseImpl::default();
|
||||
|
||||
db.query_mut(Input).set('a', 1);
|
||||
let thread1 = std::thread::spawn({
|
||||
let db = db.snapshot();
|
||||
move || {
|
||||
// Here we compute a long-chain of queries,
|
||||
// but the last one gets cancelled.
|
||||
db.knobs().sum_signal_on_entry.with_value(1, || {
|
||||
db.knobs()
|
||||
.sum_wait_for_cancellation
|
||||
.with_value(CancelationFlag::Panic, || db.sum3("a"))
|
||||
})
|
||||
}
|
||||
});
|
||||
db.query_mut(Input).set('a', 1);
|
||||
let thread1 = std::thread::spawn({
|
||||
let db = db.snapshot();
|
||||
move || {
|
||||
// Here we compute a long-chain of queries,
|
||||
// but the last one gets cancelled.
|
||||
db.knobs().sum_signal_on_entry.with_value(1, || {
|
||||
db.knobs()
|
||||
.sum_wait_for_cancellation
|
||||
.with_value(flag, || db.sum3("a"))
|
||||
})
|
||||
}
|
||||
});
|
||||
|
||||
db.wait_for(1);
|
||||
db.wait_for(1);
|
||||
|
||||
// Set unrelated input to bump revision
|
||||
db.query_mut(Input).set('b', 2);
|
||||
// Set unrelated input to bump revision
|
||||
db.query_mut(Input).set('b', 2);
|
||||
|
||||
// Here we should recompuet the whole chain again, clearing the cancellation
|
||||
// state. If we get `usize::max()` here, it is a bug!
|
||||
assert_eq!(db.sum3("a"), 1);
|
||||
// Here we should recompuet the whole chain again, clearing the cancellation
|
||||
// state. If we get `usize::max()` here, it is a bug!
|
||||
assert_eq!(db.sum3("a"), 1);
|
||||
|
||||
assert_canceled!(thread1);
|
||||
assert_canceled!(flag, thread1);
|
||||
|
||||
db.query_mut(Input).set('a', 3);
|
||||
db.query_mut(Input).set('a', 4);
|
||||
assert_eq!(db.sum3("ab"), 6);
|
||||
db.query_mut(Input).set('a', 3);
|
||||
db.query_mut(Input).set('a', 4);
|
||||
assert_eq!(db.sum3("ab"), 6);
|
||||
})
|
||||
}
|
||||
|
||||
/// Here, we compute `sum3_drop_sum` and -- in the process -- observe
|
||||
|
@ -138,6 +155,7 @@ fn no_back_dating_in_cancellation() {
|
|||
/// reinvoke `sum3_drop_sum` and we have to re-execute
|
||||
/// `sum2_drop_sum`. But the result of `sum2_drop_sum` doesn't
|
||||
/// change, so we don't have to re-execute `sum3_drop_sum`.
|
||||
/// This only works with SpecialValue cancellation strategy.
|
||||
#[test]
|
||||
fn transitive_cancellation() {
|
||||
let mut db = ParDatabaseImpl::default();
|
||||
|
|
|
@ -143,21 +143,14 @@ fn sum(db: &impl ParDatabase, key: &'static str) -> usize {
|
|||
|
||||
match db.knobs().sum_wait_for_cancellation.get() {
|
||||
CancelationFlag::Down => (),
|
||||
CancelationFlag::SpecialValue => {
|
||||
flag => {
|
||||
log::debug!("waiting for cancellation");
|
||||
while !db.salsa_runtime().is_current_revision_canceled() {
|
||||
std::thread::yield_now();
|
||||
}
|
||||
log::debug!("observed cancelation");
|
||||
}
|
||||
CancelationFlag::Panic => {
|
||||
log::debug!("waiting for cancellation");
|
||||
loop {
|
||||
db.salsa_runtime().if_current_revision_is_canceled(|| {
|
||||
log::debug!("observed cancelation");
|
||||
Canceled::throw()
|
||||
});
|
||||
std::thread::yield_now();
|
||||
if flag == CancelationFlag::Panic {
|
||||
Canceled::throw();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue