diff --git a/Cargo.toml b/Cargo.toml index 42989fb6..33e83f24 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ description = "A generic framework for on-demand, incrementalized computation (e readme = "README.md" [dependencies] +crossbeam = "0.7.1" derive-new = "0.5.5" indexmap = "1.0.1" lock_api = "0.2.0" diff --git a/src/interned.rs b/src/interned.rs index 145fc61a..d82494ba 100644 --- a/src/interned.rs +++ b/src/interned.rs @@ -6,14 +6,16 @@ use crate::plumbing::QueryStorageMassOps; use crate::plumbing::QueryStorageOps; use crate::runtime::ChangedAt; use crate::runtime::Revision; -use crate::runtime::StampedValue; use crate::Query; use crate::{Database, DiscardIf, SweepStrategy}; +use crossbeam::atomic::AtomicCell; use parking_lot::RwLock; use rustc_hash::FxHashMap; use std::collections::hash_map::Entry; use std::convert::From; +use std::fmt::Debug; use std::hash::Hash; +use std::sync::Arc; /// Handles storage where the value is 'derived' by executing a /// function (in contrast to "inputs"). @@ -81,24 +83,39 @@ impl InternKey for InternId { enum InternValue { /// The value has not been gc'd. - Present { - value: K, - - /// When was this intern'd? - /// - /// (This informs the "changed-at" result) - interned_at: Revision, - - /// When was it accessed? - /// - /// (This informs the garbage collector) - accessed_at: Revision, - }, + Present { slot: Arc> }, /// Free-list -- the index is the next Free { next: Option }, } +#[derive(Debug)] +struct Slot { + /// Index of this slot in the list of interned values; + /// set to None if gc'd. + index: InternId, + + /// Value that was interned. + value: K, + + /// When was this intern'd? + /// + /// (This informs the "changed-at" result) + interned_at: Revision, + + /// When was it accessed? Equal to `None` if this slot has + /// been garbage collected. + /// + /// This has a subtle interaction with the garbage + /// collector. First, we will never GC anything accessed in the + /// current revision. + /// + /// To protect a slot from being GC'd, we can therefore update the + /// `accessed_at` field to `Some(revision_now)` before releasing + /// the read-lock on our interning tables. + accessed_at: AtomicCell>, +} + impl std::panic::RefUnwindSafe for InternedStorage where Q: Query, @@ -145,6 +162,41 @@ where } } +impl InternTables { + /// Returns the slot for the given key. + /// + /// The slot will have its "accessed at" field updated to its current revision, + /// ensuring that it cannot be GC'd until the current queries complete. + fn slot_for_key(&self, key: &K, revision_now: Revision) -> Option>> { + let index = self.map.get(key)?; + Some(self.slot_for_index(*index, revision_now)) + } + + /// Returns the slot at the given index. + /// + /// The slot will have its "accessed at" field updated to its current revision, + /// ensuring that it cannot be GC'd until the current queries complete. + fn slot_for_index(&self, index: InternId, revision_now: Revision) -> Arc> { + match &self.values[index.as_usize()] { + InternValue::Present { slot } => { + // Subtle: we must update the "accessed at" to the + // current revision *while the lock is held* to + // prevent this slot from being GC'd. + let updated = slot.try_update_accessed_at(revision_now); + assert!( + updated, + "failed to update slot {:?} while holding read lock", + slot + ); + slot.clone() + } + InternValue::Free { .. } => { + panic!("index {:?} is free but should not be", index); + } + } + } +} + impl Default for InternTables where K: Eq + Hash, @@ -165,7 +217,12 @@ where Q::Value: InternKey, DB: Database, { - fn intern_index(&self, db: &DB, key: &Q::Key) -> StampedValue { + /// If `key` has already been interned, returns its slot. Otherwise, creates a new slot. + /// + /// In either case, the `accessed_at` field of the slot is updated + /// to the current revision, ensuring that the slot cannot be GC'd + /// while the current queries execute. + fn intern_index(&self, db: &DB, key: &Q::Key) -> Arc> { if let Some(i) = self.intern_check(db, key) { return i; } @@ -180,23 +237,15 @@ where Entry::Vacant(entry) => entry, Entry::Occupied(entry) => { // Somebody inserted this key while we were waiting - // for the write lock. + // for the write lock. In this case, we don't need to + // update the `accessed_at` field because they should + // have already done so! let index = *entry.get(); match &tables.values[index.as_usize()] { - InternValue::Present { - value, - interned_at, - accessed_at, - } => { - debug_assert_eq!(owned_key2, *value); - debug_assert_eq!(*accessed_at, revision_now); - return StampedValue { - value: index, - changed_at: ChangedAt { - is_constant: false, - revision: *interned_at, - }, - }; + InternValue::Present { slot } => { + debug_assert_eq!(owned_key2, slot.value); + debug_assert_eq!(slot.accessed_at.load(), Some(revision_now)); + return slot.clone(); } InternValue::Free { .. } => { @@ -206,179 +255,60 @@ where } }; - let index = match tables.first_free { + let create_slot = |index: InternId| { + Arc::new(Slot { + index, + value: owned_key2, + interned_at: revision_now, + accessed_at: AtomicCell::new(Some(revision_now)), + }) + }; + + let (slot, index); + match tables.first_free { None => { - let index = InternId::from(tables.values.len()); - tables.values.push(InternValue::Present { - value: owned_key2, - interned_at: revision_now, - accessed_at: revision_now, - }); - index + index = InternId::from(tables.values.len()); + slot = create_slot(index); + tables + .values + .push(InternValue::Present { slot: slot.clone() }); } Some(i) => { + index = i; + slot = create_slot(index); + let next_free = match &tables.values[i.as_usize()] { InternValue::Free { next } => *next, - InternValue::Present { value, .. } => { + InternValue::Present { slot } => { panic!( "index {:?} was supposed to be free but contains {:?}", - i, value + i, slot.value ); } }; - tables.values[i.as_usize()] = InternValue::Present { - value: owned_key2, - interned_at: revision_now, - accessed_at: revision_now, - }; + tables.values[index.as_usize()] = InternValue::Present { slot: slot.clone() }; tables.first_free = next_free; - i } - }; + } entry.insert(index); - StampedValue { - value: index, - changed_at: ChangedAt { - is_constant: false, - revision: revision_now, - }, - } + slot } - fn intern_check(&self, db: &DB, key: &Q::Key) -> Option> { + fn intern_check(&self, db: &DB, key: &Q::Key) -> Option>> { let revision_now = db.salsa_runtime().current_revision(); - - // First, try with read lock -- this only works if `accessed_at` is up to date. - { - let tables = self.tables.read(); - let &index = tables.map.get(key)?; - match &tables.values[index.as_usize()] { - InternValue::Present { - interned_at, - accessed_at, - .. - } => { - if *accessed_at == revision_now { - return Some(StampedValue { - value: index, - changed_at: ChangedAt { - is_constant: false, - revision: *interned_at, - }, - }); - } - } - - InternValue::Free { .. } => { - panic!( - "key {:?} maps to index {:?} is free but should not be", - key, index - ); - } - } - } - - // Acquire write lock if necessary. - let mut tables = self.tables.write(); - let &index = tables.map.get(key)?; - match &mut tables.values[index.as_usize()] { - InternValue::Present { - interned_at, - accessed_at, - .. - } => { - *accessed_at = revision_now; - - return Some(StampedValue { - value: index, - changed_at: ChangedAt { - is_constant: false, - revision: *interned_at, - }, - }); - } - - InternValue::Free { .. } => { - panic!( - "key {:?} maps to index {:?} is free but should not be", - key, index - ); - } - } + let slot = self.tables.read().slot_for_key(key, revision_now)?; + Some(slot) } /// Given an index, lookup and clone its value, updating the /// `accessed_at` time if necessary. - fn lookup_value( - &self, - db: &DB, - index: InternId, - op: impl FnOnce(&Q::Key) -> R, - ) -> StampedValue { - let index = index.as_usize(); + fn lookup_value(&self, db: &DB, index: InternId) -> Arc> { let revision_now = db.salsa_runtime().current_revision(); - - { - let tables = self.tables.read(); - debug_assert!( - index < tables.values.len(), - "interned key ``{:?}({})` is out of bounds", - Q::default(), - index, - ); - match &tables.values[index] { - InternValue::Present { - accessed_at, - interned_at, - value, - } => { - if *accessed_at == revision_now { - return StampedValue { - value: op(value), - changed_at: ChangedAt { - is_constant: false, - revision: *interned_at, - }, - }; - } - } - - InternValue::Free { .. } => panic!( - "interned key `{:?}({})` has been garbage collected", - Q::default(), - index, - ), - } - } - - let mut tables = self.tables.write(); - match &mut tables.values[index] { - InternValue::Present { - accessed_at, - interned_at, - value, - } => { - *accessed_at = revision_now; - - return StampedValue { - value: op(value), - changed_at: ChangedAt { - is_constant: false, - revision: *interned_at, - }, - }; - } - - InternValue::Free { .. } => panic!( - "interned key `{:?}({})` has been garbage collected", - Q::default(), - index, - ), - } + self.tables.read().slot_for_index(index, revision_now) } } @@ -394,12 +324,17 @@ where key: &Q::Key, database_key: &DB::DatabaseKey, ) -> Result { - let StampedValue { value, changed_at } = self.intern_index(db, key); - - db.salsa_runtime() - .report_query_read(database_key, changed_at); - - Ok(::from_intern_id(value)) + let slot = self.intern_index(db, key); + let changed_at = slot.interned_at; + let index = slot.index; + db.salsa_runtime().report_query_read( + database_key, + ChangedAt { + is_constant: false, + revision: changed_at, + }, + ); + Ok(::from_intern_id(index)) } fn maybe_changed_since( @@ -410,10 +345,7 @@ where _database_key: &DB::DatabaseKey, ) -> bool { match self.intern_check(db, key) { - Some(StampedValue { - value: _, - changed_at, - }) => changed_at.changed_since(revision), + Some(slot) => slot.interned_at > revision, None => true, } } @@ -452,8 +384,8 @@ where first_free, } = &mut *tables; map.retain(|key, intern_index| { - let discard = match strategy.discard_if { - DiscardIf::Never => false, + match strategy.discard_if { + DiscardIf::Never => true, // NB: Interned keys *never* discard keys unless they // are outdated, regardless of the sweep strategy. This is @@ -467,8 +399,17 @@ where // revision don't have this problem. Anything // dependent on them would regard itself as dirty if // they are removed and also be forced to re-execute. - DiscardIf::Always | DiscardIf::Outdated => match values[intern_index.as_usize()] { - InternValue::Present { accessed_at, .. } => accessed_at < revision_now, + DiscardIf::Always | DiscardIf::Outdated => match &values[intern_index.as_usize()] { + InternValue::Present { slot, .. } => { + if slot.try_collect(revision_now) { + values[intern_index.as_usize()] = + InternValue::Free { next: *first_free }; + *first_free = Some(*intern_index); + false + } else { + true + } + } InternValue::Free { .. } => { panic!( @@ -477,14 +418,7 @@ where ); } }, - }; - - if discard { - values[intern_index.as_usize()] = InternValue::Free { next: *first_free }; - *first_free = Some(*intern_index); } - - !discard }); } } @@ -515,12 +449,14 @@ where let group_storage = >::group_storage(db); let interned_storage = IQ::query_storage(group_storage); - let StampedValue { value, changed_at } = - interned_storage.lookup_value(db, index, Clone::clone); - + let slot = interned_storage.lookup_value(db, index); + let changed_at = ChangedAt { + is_constant: false, + revision: slot.interned_at, + }; + let value = slot.value.clone(); db.salsa_runtime() .report_query_read(database_key, changed_at); - Ok(value) } @@ -555,12 +491,8 @@ where let group_storage = >::group_storage(db); let interned_storage = IQ::query_storage(group_storage); - let StampedValue { - value: (), - changed_at, - } = interned_storage.lookup_value(db, index, |_| ()); - - changed_at.changed_since(revision) + let slot = interned_storage.lookup_value(db, index); + slot.interned_at > revision } fn is_constant(&self, _db: &DB, _key: &Q::Key, _database_key: &DB::DatabaseKey) -> bool { @@ -601,3 +533,54 @@ where { fn sweep(&self, _db: &DB, _strategy: SweepStrategy) {} } + +impl Slot { + /// Updates the `accessed_at` time to be `revision_now` (if + /// necessary). Returns true if the update was successful, or + /// false if the slot has been GC'd in the interim. + fn try_update_accessed_at(&self, revision_now: Revision) -> bool { + if let Some(accessed_at) = self.accessed_at.load() { + match self + .accessed_at + .compare_exchange(Some(accessed_at), Some(revision_now)) + { + Ok(_) => true, + Err(Some(r)) => { + // Somebody was racing with us to update the field -- but they + // also updated it to revision now, so that's cool. + debug_assert_eq!(r, revision_now); + true + } + Err(None) => { + // The garbage collector was racing with us and it swept this + // slot before we could mark it as accessed. + false + } + } + } else { + false + } + } + + /// Invoked during sweeping to try and collect this slot. Fails if + /// the slot has been accessed in the current revision. Note that + /// this access could be racing with the attempt to collect (in + /// particular, when verifying dependencies). + fn try_collect(&self, revision_now: Revision) -> bool { + let accessed_at = self.accessed_at.load().unwrap(); + if accessed_at < revision_now { + match self.accessed_at.compare_exchange(Some(accessed_at), None) { + Ok(_) => true, + Err(r) => { + // The only one racing with us can be a + // verification attempt, which will always bump + // `accessed_at` to the current revision. + debug_assert_eq!(r, Some(revision_now)); + false + } + } + } else { + false + } + } +}