mirror of
https://github.com/salsa-rs/salsa.git
synced 2024-12-27 06:27:45 +00:00
panic when recovering from a cycle
It turns out this is necessary, as this test reveals! If we don't panic, you might encounter further cycles that aren't supposed to have executed. (Prior to these changes, this test was panicking from the second cycle.)
This commit is contained in:
parent
1027342468
commit
3f95c0b4a0
8 changed files with 156 additions and 85 deletions
|
@ -174,7 +174,11 @@ where
|
|||
}
|
||||
|
||||
db.salsa_runtime()
|
||||
.report_query_read(slot.database_key_index(), durability, changed_at);
|
||||
.report_query_read_and_panic_if_cycle_resulted(
|
||||
slot.database_key_index(),
|
||||
durability,
|
||||
changed_at,
|
||||
);
|
||||
|
||||
value
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ use crate::lru::LruNode;
|
|||
use crate::plumbing::{CycleDetected, CycleRecoveryStrategy};
|
||||
use crate::plumbing::{DatabaseOps, QueryFunction};
|
||||
use crate::revision::Revision;
|
||||
use crate::runtime::cycle_participant::CycleParticipant;
|
||||
use crate::runtime::local_state::ActiveQueryGuard;
|
||||
use crate::runtime::local_state::QueryInputs;
|
||||
use crate::runtime::local_state::QueryRevisions;
|
||||
|
@ -220,18 +221,25 @@ where
|
|||
active_query: ActiveQueryGuard<'_>,
|
||||
mut panic_guard: PanicGuard<'_, Q, MP>,
|
||||
) -> StampedValue<Q::Value> {
|
||||
log::info!("{:?}: executing query", self.database_key_index.debug(db));
|
||||
|
||||
db.salsa_event(Event {
|
||||
runtime_id: db.salsa_runtime().id(),
|
||||
kind: EventKind::WillExecute {
|
||||
database_key: self.database_key_index,
|
||||
},
|
||||
});
|
||||
|
||||
// Query was not previously executed, or value is potentially
|
||||
// stale, or value is absent. Let's execute!
|
||||
let mut result = active_query.pop_and_execute(db, || Q::execute(db, self.key.clone()));
|
||||
let value = CycleParticipant::recover(
|
||||
|| Q::execute(db, self.key.clone()),
|
||||
// If a recoverable cycle occurs, `Q::execute` will throw
|
||||
// and this closure will be executed with the cycle information.
|
||||
|cycle| Q::cycle_fallback(db, &cycle, &self.key),
|
||||
);
|
||||
|
||||
// Subtle: if we were a participant in a cycle, and we have "fallback" cycle recovery,
|
||||
// then we need to overwrite the returned value with the fallback value so that our callers
|
||||
// do not observe the actual value we returned (which is not valid). It's important that
|
||||
// we ignore the actual value that was returned because otherwise it is easy to have
|
||||
// "recovery" where the final value is dependent on which node started the cycle.
|
||||
if let Some(cycle) = &result.cycle_participant {
|
||||
result.value = Q::cycle_fallback(db, cycle, &self.key);
|
||||
}
|
||||
let mut revisions = active_query.pop();
|
||||
|
||||
// We assume that query is side-effect free -- that is, does
|
||||
// not mutate the "inputs" to the query system. Sanity check
|
||||
|
@ -252,27 +260,27 @@ where
|
|||
// used to be, that is a "breaking change" that our
|
||||
// consumers must be aware of. Becoming *more* durable
|
||||
// is not. See the test `constant_to_non_constant`.
|
||||
if result.revisions.durability >= old_memo.revisions.durability
|
||||
&& MP::memoized_value_eq(old_value, &result.value)
|
||||
if revisions.durability >= old_memo.revisions.durability
|
||||
&& MP::memoized_value_eq(old_value, &value)
|
||||
{
|
||||
debug!(
|
||||
"read_upgrade({:?}): value is equal, back-dating to {:?}",
|
||||
self, old_memo.revisions.changed_at,
|
||||
);
|
||||
|
||||
assert!(old_memo.revisions.changed_at <= result.revisions.changed_at);
|
||||
result.revisions.changed_at = old_memo.revisions.changed_at;
|
||||
assert!(old_memo.revisions.changed_at <= revisions.changed_at);
|
||||
revisions.changed_at = old_memo.revisions.changed_at;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let new_value = StampedValue {
|
||||
value: result.value,
|
||||
durability: result.revisions.durability,
|
||||
changed_at: result.revisions.changed_at,
|
||||
value: value,
|
||||
durability: revisions.durability,
|
||||
changed_at: revisions.changed_at,
|
||||
};
|
||||
|
||||
let value = if self.should_memoize_value(&self.key) {
|
||||
let memo_value = if self.should_memoize_value(&self.key) {
|
||||
Some(new_value.value.clone())
|
||||
} else {
|
||||
None
|
||||
|
@ -280,13 +288,13 @@ where
|
|||
|
||||
debug!(
|
||||
"read_upgrade({:?}): result.revisions = {:#?}",
|
||||
self, result.revisions,
|
||||
self, revisions,
|
||||
);
|
||||
|
||||
panic_guard.memo = Some(Memo {
|
||||
value,
|
||||
value: memo_value,
|
||||
verified_at: revision_now,
|
||||
revisions: result.revisions,
|
||||
revisions: revisions,
|
||||
});
|
||||
|
||||
panic_guard.proceed();
|
||||
|
|
|
@ -110,7 +110,11 @@ where
|
|||
} = slot.stamped_value.read().clone();
|
||||
|
||||
db.salsa_runtime()
|
||||
.report_query_read(slot.database_key_index, durability, changed_at);
|
||||
.report_query_read_and_panic_if_cycle_resulted(
|
||||
slot.database_key_index,
|
||||
durability,
|
||||
changed_at,
|
||||
);
|
||||
|
||||
value
|
||||
}
|
||||
|
|
|
@ -232,11 +232,12 @@ where
|
|||
let slot = self.intern_index(db, key);
|
||||
let changed_at = slot.interned_at;
|
||||
let index = slot.index;
|
||||
db.salsa_runtime().report_query_read(
|
||||
slot.database_key_index,
|
||||
INTERN_DURABILITY,
|
||||
changed_at,
|
||||
);
|
||||
db.salsa_runtime()
|
||||
.report_query_read_and_panic_if_cycle_resulted(
|
||||
slot.database_key_index,
|
||||
INTERN_DURABILITY,
|
||||
changed_at,
|
||||
);
|
||||
<Q::Value>::from_intern_id(index)
|
||||
}
|
||||
|
||||
|
@ -350,11 +351,12 @@ where
|
|||
let slot = interned_storage.lookup_value(index);
|
||||
let value = slot.value.clone();
|
||||
let interned_at = slot.interned_at;
|
||||
db.salsa_runtime().report_query_read(
|
||||
slot.database_key_index,
|
||||
INTERN_DURABILITY,
|
||||
interned_at,
|
||||
);
|
||||
db.salsa_runtime()
|
||||
.report_query_read_and_panic_if_cycle_resulted(
|
||||
slot.database_key_index,
|
||||
INTERN_DURABILITY,
|
||||
interned_at,
|
||||
);
|
||||
value
|
||||
}
|
||||
|
||||
|
|
|
@ -13,6 +13,8 @@ use std::sync::Arc;
|
|||
pub(crate) type FxIndexSet<K> = indexmap::IndexSet<K, BuildHasherDefault<FxHasher>>;
|
||||
pub(crate) type FxIndexMap<K, V> = indexmap::IndexMap<K, V, BuildHasherDefault<FxHasher>>;
|
||||
|
||||
pub(crate) mod cycle_participant;
|
||||
|
||||
mod dependency_graph;
|
||||
use dependency_graph::DependencyGraph;
|
||||
|
||||
|
@ -219,19 +221,23 @@ impl Runtime {
|
|||
/// Reports that the currently active query read the result from
|
||||
/// another query.
|
||||
///
|
||||
/// Also checks whether the "cycle participant" flag is set on
|
||||
/// the current stack frame -- if so, panics with `CycleParticipant`
|
||||
/// value, which should be caught by the code executing the query.
|
||||
///
|
||||
/// # Parameters
|
||||
///
|
||||
/// - `database_key`: the query whose result was read
|
||||
/// - `changed_revision`: the last revision in which the result of that
|
||||
/// query had changed
|
||||
pub(crate) fn report_query_read(
|
||||
pub(crate) fn report_query_read_and_panic_if_cycle_resulted(
|
||||
&self,
|
||||
input: DatabaseKeyIndex,
|
||||
durability: Durability,
|
||||
changed_at: Revision,
|
||||
) {
|
||||
self.local_state
|
||||
.report_query_read(input, durability, changed_at);
|
||||
.report_query_read_and_panic_if_cycle_resulted(input, durability, changed_at);
|
||||
}
|
||||
|
||||
/// Reports that the query depends on some state unknown to salsa.
|
||||
|
|
26
src/runtime/cycle_participant.rs
Normal file
26
src/runtime/cycle_participant.rs
Normal file
|
@ -0,0 +1,26 @@
|
|||
use std::panic::AssertUnwindSafe;
|
||||
|
||||
use crate::Cycle;
|
||||
|
||||
pub(crate) struct CycleParticipant {
|
||||
cycle: Cycle,
|
||||
}
|
||||
|
||||
impl CycleParticipant {
|
||||
pub(crate) fn new(cycle: Cycle) -> Self {
|
||||
Self { cycle }
|
||||
}
|
||||
|
||||
pub(crate) fn throw(self) {
|
||||
std::panic::resume_unwind(Box::new(self));
|
||||
}
|
||||
|
||||
pub(crate) fn recover<T>(execute: impl FnOnce() -> T, recover: impl FnOnce(Cycle) -> T) -> T {
|
||||
std::panic::catch_unwind(AssertUnwindSafe(execute)).unwrap_or_else(|err| {
|
||||
match err.downcast::<CycleParticipant>() {
|
||||
Ok(participant) => recover(participant.cycle),
|
||||
Err(v) => std::panic::resume_unwind(v),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -1,14 +1,12 @@
|
|||
use crate::durability::Durability;
|
||||
use crate::runtime::ActiveQuery;
|
||||
use crate::runtime::Revision;
|
||||
use crate::Cycle;
|
||||
use crate::Database;
|
||||
use crate::DatabaseKeyIndex;
|
||||
use crate::Event;
|
||||
use crate::EventKind;
|
||||
use std::cell::RefCell;
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::cycle_participant::CycleParticipant;
|
||||
|
||||
/// State that is specific to a single execution thread.
|
||||
///
|
||||
/// Internally, this type uses ref-cells.
|
||||
|
@ -26,19 +24,6 @@ pub(super) struct LocalState {
|
|||
query_stack: RefCell<Option<Vec<ActiveQuery>>>,
|
||||
}
|
||||
|
||||
pub(crate) struct ComputedQueryResult<V> {
|
||||
/// Final value produced
|
||||
pub(crate) value: V,
|
||||
|
||||
/// Information about the other queries that were
|
||||
/// accessed.
|
||||
pub(crate) revisions: QueryRevisions,
|
||||
|
||||
/// If this node participated in a cycle, then this value is set
|
||||
/// to the cycle in which it participated.
|
||||
pub(crate) cycle_participant: Option<Cycle>,
|
||||
}
|
||||
|
||||
/// Summarizes "all the inputs that a query used"
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct QueryRevisions {
|
||||
|
@ -106,7 +91,7 @@ impl LocalState {
|
|||
})
|
||||
}
|
||||
|
||||
pub(super) fn report_query_read(
|
||||
pub(super) fn report_query_read_and_panic_if_cycle_resulted(
|
||||
&self,
|
||||
input: DatabaseKeyIndex,
|
||||
durability: Durability,
|
||||
|
@ -115,6 +100,10 @@ impl LocalState {
|
|||
self.with_query_stack(|stack| {
|
||||
if let Some(top_query) = stack.last_mut() {
|
||||
top_query.add_read(input, durability, changed_at);
|
||||
|
||||
if let Some(cycle) = top_query.cycle.take() {
|
||||
CycleParticipant::new(cycle).throw()
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -188,44 +177,18 @@ impl ActiveQueryGuard<'_> {
|
|||
query
|
||||
}
|
||||
|
||||
/// As the final action from a pushed query, you can
|
||||
/// execute the query implementation. This invokes the
|
||||
/// given closure and then returns the "computed query result",
|
||||
/// which includes the returned value as well as dependency
|
||||
/// and cycle information.
|
||||
///
|
||||
/// Executing this method pops the query from the stack.
|
||||
/// Pops an active query from the stack. Returns the [`QueryRevisions`]
|
||||
/// which summarizes the other queries that were accessed during this
|
||||
/// query's execution.
|
||||
#[inline]
|
||||
pub(crate) fn pop_and_execute<DB, V>(
|
||||
self,
|
||||
db: &DB,
|
||||
execute: impl FnOnce() -> V,
|
||||
) -> ComputedQueryResult<V>
|
||||
where
|
||||
DB: ?Sized + Database,
|
||||
{
|
||||
log::info!("{:?}: executing query", self.database_key_index);
|
||||
|
||||
db.salsa_event(Event {
|
||||
runtime_id: db.salsa_runtime().id(),
|
||||
kind: EventKind::WillExecute {
|
||||
database_key: self.database_key_index,
|
||||
},
|
||||
});
|
||||
|
||||
// Execute user's code, accumulating inputs etc.
|
||||
let value = execute();
|
||||
|
||||
pub(crate) fn pop(self) -> QueryRevisions {
|
||||
// Extract accumulated inputs.
|
||||
let popped_query = self.complete();
|
||||
|
||||
let revisions = popped_query.revisions();
|
||||
// If this frame were a cycle participant, it would have unwound.
|
||||
assert!(popped_query.cycle.is_none());
|
||||
|
||||
ComputedQueryResult {
|
||||
value,
|
||||
revisions,
|
||||
cycle_participant: popped_query.cycle,
|
||||
}
|
||||
popped_query.revisions()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -89,6 +89,7 @@ enum CycleQuery {
|
|||
A,
|
||||
B,
|
||||
C,
|
||||
AthenC,
|
||||
}
|
||||
|
||||
#[salsa::query_group(GroupStruct)]
|
||||
|
@ -153,20 +154,27 @@ impl CycleQuery {
|
|||
CycleQuery::A => db.cycle_a(),
|
||||
CycleQuery::B => db.cycle_b(),
|
||||
CycleQuery::C => db.cycle_c(),
|
||||
CycleQuery::AthenC => {
|
||||
let _ = db.cycle_a();
|
||||
db.cycle_c()
|
||||
}
|
||||
CycleQuery::None => Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn cycle_a(db: &dyn Database) -> Result<(), Error> {
|
||||
dbg!("cycle_a");
|
||||
db.a_invokes().invoke(db)
|
||||
}
|
||||
|
||||
fn cycle_b(db: &dyn Database) -> Result<(), Error> {
|
||||
dbg!("cycle_b");
|
||||
db.b_invokes().invoke(db)
|
||||
}
|
||||
|
||||
fn cycle_c(db: &dyn Database) -> Result<(), Error> {
|
||||
dbg!("cycle_c");
|
||||
db.c_invokes().invoke(db)
|
||||
}
|
||||
|
||||
|
@ -334,3 +342,53 @@ fn cycle_deterministic_order() {
|
|||
)
|
||||
"###);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cycle_multiple() {
|
||||
// No matter whether we start from A or B, we get the same set of participants:
|
||||
let mut db = DatabaseImpl::default();
|
||||
|
||||
// Configuration:
|
||||
//
|
||||
// A --> B <-- C
|
||||
// ^ | ^
|
||||
// +-----+ |
|
||||
// | |
|
||||
// +-----+
|
||||
//
|
||||
// Here, conceptually, B encounters a cycle with A and then
|
||||
// recovers.
|
||||
|
||||
db.set_b_invokes(CycleQuery::AthenC);
|
||||
let c = db.cycle_c();
|
||||
let b = db.cycle_b();
|
||||
let a = db.cycle_a();
|
||||
insta::assert_debug_snapshot!((a, b, c), @r###"
|
||||
(
|
||||
Err(
|
||||
Error {
|
||||
cycle: [
|
||||
"cycle_a(())",
|
||||
"cycle_b(())",
|
||||
],
|
||||
},
|
||||
),
|
||||
Err(
|
||||
Error {
|
||||
cycle: [
|
||||
"cycle_a(())",
|
||||
"cycle_b(())",
|
||||
],
|
||||
},
|
||||
),
|
||||
Err(
|
||||
Error {
|
||||
cycle: [
|
||||
"cycle_a(())",
|
||||
"cycle_b(())",
|
||||
],
|
||||
},
|
||||
),
|
||||
)
|
||||
"###);
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue