diff --git a/Cargo.toml b/Cargo.toml index b78d45f2..69be0645 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ rustc-hash = "1.0" parking_lot = "0.6.4" indexmap = "1.0.1" log = "0.4.5" +smallvec = "0.6.5" [dev-dependencies] difference = "2.0" diff --git a/src/runtime.rs b/src/runtime.rs index c74ed72f..e5333f15 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -1,7 +1,8 @@ use crate::Database; use log::debug; -use parking_lot::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard}; -use rustc_hash::FxHasher; +use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockUpgradableReadGuard}; +use rustc_hash::{FxHashMap, FxHasher}; +use smallvec::SmallVec; use std::cell::RefCell; use std::fmt::Write; use std::hash::BuildHasherDefault; @@ -9,7 +10,6 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; type FxIndexSet = indexmap::IndexSet>; -type FxIndexMap = indexmap::IndexMap>; /// The salsa runtime stores the storage for all queries as well as /// tracking the query stack and dependencies between cycles. @@ -261,6 +261,19 @@ where } panic!(message) } + + /// Try to make this runtime blocked on `other_id`. Returns true + /// upon success or false if `other_id` is already blocked on us. + pub(crate) fn try_block_on( + &self, + descriptor: &DB::QueryDescriptor, + other_id: RuntimeId, + ) -> bool { + self.shared_state + .dependency_graph + .lock() + .add_edge(self.id(), descriptor, other_id) + } } /// State that will be common to all threads (when we support multiple threads) @@ -294,6 +307,10 @@ struct SharedState { /// "short-circuit" their results if they learn that. See /// `is_current_revision_canceled` for more information. pending_revision_increments: AtomicUsize, + + /// The dependency graph tracks which runtimes are blocked on one + /// another, waiting for queries to terminate. + dependency_graph: Mutex>, } impl Default for SharedState { @@ -303,6 +320,7 @@ impl Default for SharedState { storage: Default::default(), query_lock: Default::default(), revision: Default::default(), + dependency_graph: Default::default(), pending_revision_increments: Default::default(), } } @@ -361,7 +379,7 @@ impl ActiveQuery { } } -#[derive(Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] pub(crate) struct RuntimeId { counter: usize, } @@ -462,3 +480,64 @@ pub(crate) struct StampedValue { pub(crate) value: V, pub(crate) changed_at: ChangedAt, } + +struct DependencyGraph { + /// A `(K -> V)` pair in this map indicates that the the runtime + /// `K` is blocked on some query executing in the runtime `V`. + /// This encodes a graph that must be acyclic (or else deadlock + /// will result). + edges: FxHashMap, + labels: FxHashMap>, +} + +impl Default for DependencyGraph { + fn default() -> Self { + DependencyGraph { + edges: Default::default(), + labels: Default::default(), + } + } +} + +impl DependencyGraph { + /// Attempt to add an edge `from_id -> to_id` into the result graph. + fn add_edge( + &mut self, + from_id: RuntimeId, + descriptor: &DB::QueryDescriptor, + to_id: RuntimeId, + ) -> bool { + assert_ne!(from_id, to_id); + debug_assert!(!self.edges.contains_key(&from_id)); + + // First: walk the chain of things that `to_id` depends on, + // looking for us. + let mut p = to_id; + while let Some(&q) = self.edges.get(&p) { + if q == from_id { + return false; + } + + p = q; + } + + self.edges.insert(from_id, to_id); + self.labels + .entry(descriptor.clone()) + .or_insert(SmallVec::default()) + .push(from_id); + true + } + + fn remove_edge(&mut self, descriptor: &DB::QueryDescriptor, to_id: RuntimeId) { + let vec = self + .labels + .remove(descriptor) + .unwrap_or(SmallVec::default()); + + for from_id in &vec { + let to_id1 = self.edges.remove(from_id); + assert_eq!(Some(to_id), to_id1); + } + } +}