return durability of modified data and remove SharedStateWriteGuard

Now the `with_incremented_revision` method signature does not
reference the database DB in any way.
This commit is contained in:
Niko Matsakis 2020-07-01 14:12:51 +00:00
parent 705a35d4b7
commit c6663f3dcb
3 changed files with 27 additions and 46 deletions

View file

@ -226,14 +226,16 @@ where
{ {
fn invalidate(&self, db: &mut DB, key: &Q::Key) { fn invalidate(&self, db: &mut DB, key: &Q::Key) {
db.salsa_runtime_mut() db.salsa_runtime_mut()
.with_incremented_revision(&mut |_new_revision, guard| { .with_incremented_revision(&mut |_new_revision| {
let map_read = self.slot_map.read(); let map_read = self.slot_map.read();
if let Some(slot) = map_read.get(key) { if let Some(slot) = map_read.get(key) {
if let Some(durability) = slot.invalidate() { if let Some(durability) = slot.invalidate() {
guard.mark_durability_as_changed(durability); return Some(durability);
} }
} }
None
}) })
} }
} }

View file

@ -193,7 +193,7 @@ where
// case doesn't generally seem worth optimizing for. // case doesn't generally seem worth optimizing for.
let mut value = Some(value); let mut value = Some(value);
db.salsa_runtime_mut() db.salsa_runtime_mut()
.with_incremented_revision(&mut |next_revision, guard| { .with_incremented_revision(&mut |next_revision| {
let mut slots = self.slots.write(); let mut slots = self.slots.write();
// Do this *after* we acquire the lock, so that we are not // Do this *after* we acquire the lock, so that we are not
@ -209,8 +209,9 @@ where
match slots.entry(key.clone()) { match slots.entry(key.clone()) {
Entry::Occupied(entry) => { Entry::Occupied(entry) => {
let mut slot_stamped_value = entry.get().stamped_value.write(); let mut slot_stamped_value = entry.get().stamped_value.write();
guard.mark_durability_as_changed(slot_stamped_value.durability); let old_durability = slot_stamped_value.durability;
*slot_stamped_value = stamped_value; *slot_stamped_value = stamped_value;
Some(old_durability)
} }
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
@ -225,6 +226,7 @@ where
database_key_index, database_key_index,
stamped_value: RwLock::new(stamped_value), stamped_value: RwLock::new(stamped_value),
})); }));
None
} }
} }
}); });

View file

@ -143,9 +143,7 @@ where
/// will block until that snapshot is dropped -- if that snapshot /// will block until that snapshot is dropped -- if that snapshot
/// is owned by the current thread, this could trigger deadlock. /// is owned by the current thread, this could trigger deadlock.
pub fn synthetic_write(&mut self, durability: Durability) { pub fn synthetic_write(&mut self, durability: Durability) {
self.with_incremented_revision(&mut |_next_revision, guard| { self.with_incremented_revision(&mut |_next_revision| Some(durability));
guard.mark_durability_as_changed(durability);
});
} }
/// Default implementation for `Database::sweep_all`. /// Default implementation for `Database::sweep_all`.
@ -282,20 +280,29 @@ where
/// Acquires the **global query write lock** (ensuring that no queries are /// Acquires the **global query write lock** (ensuring that no queries are
/// executing) and then increments the current revision counter; invokes /// executing) and then increments the current revision counter; invokes
/// `op` with the global query write lock still held. The `op` closure is /// `op` with the global query write lock still held.
/// given the new revision as an argument, and it should actually perform
/// the writes.
/// ///
/// While we wait to acquire the global query write lock, this method will /// While we wait to acquire the global query write lock, this method will
/// also increment `pending_revision_increments`, thus signalling to queries /// also increment `pending_revision_increments`, thus signalling to queries
/// that their results are "canceled" and they should abort as expeditiously /// that their results are "canceled" and they should abort as expeditiously
/// as possible. /// as possible.
/// ///
/// The `op` closure should actually perform the writes needed. It is given
/// the new revision as an argument, and its return value indicates whether
/// any pre-existing value was modified:
///
/// - returning `None` means that no pre-existing value was modified (this
/// could occur e.g. when setting some key on an input that was never set
/// before)
/// - returning `Some(d)` indicates that a pre-existing value was modified
/// and it had the durability `d`. This will update the records for when
/// values with each durability were modified.
///
/// Note that, given our writer model, we can assume that only one thread is /// Note that, given our writer model, we can assume that only one thread is
/// attempting to increment the global revision at a time. /// attempting to increment the global revision at a time.
pub(crate) fn with_incremented_revision( pub(crate) fn with_incremented_revision(
&mut self, &mut self,
op: &mut dyn FnMut(Revision, &DatabaseWriteLockGuard<'_, DB>), op: &mut dyn FnMut(Revision) -> Option<Durability>,
) { ) {
log::debug!("increment_revision()"); log::debug!("increment_revision()");
@ -318,13 +325,11 @@ where
debug!("increment_revision: incremented to {:?}", new_revision); debug!("increment_revision: incremented to {:?}", new_revision);
op( if let Some(d) = op(new_revision) {
new_revision, for rev in &self.shared_state.revisions[1..=d.index()] {
&DatabaseWriteLockGuard { rev.store(new_revision);
runtime: self, }
new_revision, }
},
)
} }
pub(crate) fn permits_increment(&self) -> bool { pub(crate) fn permits_increment(&self) -> bool {
@ -528,34 +533,6 @@ where
} }
} }
/// Temporary guard that indicates that the database write-lock is
/// held. You can get one of these by invoking
/// `with_incremented_revision`. It gives access to the new revision
/// and a few other operations that only make sense to do while an
/// update is happening.
pub(crate) struct DatabaseWriteLockGuard<'db, DB>
where
DB: Database,
{
runtime: &'db mut Runtime<DB>,
new_revision: Revision,
}
impl<DB> DatabaseWriteLockGuard<'_, DB>
where
DB: Database,
{
/// Indicates that this update modified an input marked as
/// "constant". This will force re-evaluation of anything that was
/// dependent on constants (which otherwise might not get
/// re-evaluated).
pub(crate) fn mark_durability_as_changed(&self, d: Durability) {
for rev in &self.runtime.shared_state.revisions[1..=d.index()] {
rev.store(self.new_revision);
}
}
}
/// State that will be common to all threads (when we support multiple threads) /// State that will be common to all threads (when we support multiple threads)
struct SharedState<DB: Database> { struct SharedState<DB: Database> {
storage: DB::DatabaseStorage, storage: DB::DatabaseStorage,