mirror of
https://github.com/salsa-rs/salsa.git
synced 2025-01-12 16:35:21 +00:00
remove use of upgradable_read
from input queries
This commit is contained in:
parent
f4c00cfe97
commit
39dd71ff66
2 changed files with 45 additions and 44 deletions
68
src/input.rs
68
src/input.rs
|
@ -10,7 +10,7 @@ use crate::Database;
|
|||
use crate::Query;
|
||||
use crate::SweepStrategy;
|
||||
use log::debug;
|
||||
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
|
||||
use parking_lot::RwLock;
|
||||
use rustc_hash::FxHashMap;
|
||||
use std::collections::hash_map::Entry;
|
||||
|
||||
|
@ -63,53 +63,47 @@ where
|
|||
fn set_common(&self, db: &DB, key: &Q::Key, value: Q::Value, is_constant: IsConstant) {
|
||||
let key = key.clone();
|
||||
|
||||
// This upgradable read just serves to gate against other
|
||||
// people invoking `set`; we should probably remove it and
|
||||
// instead acquire the query-write-lock, but that would
|
||||
// require refactoring the `increment_revision` API a bit.
|
||||
let map = self.map.upgradable_read();
|
||||
|
||||
// The value is changing, so even if we are setting this to a
|
||||
// constant, we still need a new revision.
|
||||
//
|
||||
// CAREFUL: This will block until the global revision lock can
|
||||
// be acquired. If there are still queries executing, they may
|
||||
// need to read from this input. Therefore, we do not upgrade
|
||||
// our lock (which would prevent them from reading) until
|
||||
// `increment_revision` has finished.
|
||||
let next_revision = db.salsa_runtime().increment_revision();
|
||||
// need to read from this input. Therefore, we wait to acquire
|
||||
// the lock on `map` until we also hold the global query write
|
||||
// lock.
|
||||
db.salsa_runtime().with_incremented_revision(|next_revision| {
|
||||
let mut map = self.map.write();
|
||||
|
||||
let mut map = RwLockUpgradableReadGuard::upgrade(map);
|
||||
// Do this *after* we acquire the lock, so that we are not
|
||||
// racing with somebody else to modify this same cell.
|
||||
// (Otherwise, someone else might write a *newer* revision
|
||||
// into the same cell while we block on the lock.)
|
||||
let changed_at = ChangedAt {
|
||||
is_constant: is_constant.0,
|
||||
revision: next_revision,
|
||||
};
|
||||
|
||||
// Do this *after* we acquire the lock, so that we are not
|
||||
// racing with somebody else to modify this same cell.
|
||||
// (Otherwise, someone else might write a *newer* revision
|
||||
// into the same cell while we block on the lock.)
|
||||
let changed_at = ChangedAt {
|
||||
is_constant: is_constant.0,
|
||||
revision: next_revision,
|
||||
};
|
||||
let stamped_value = StampedValue { value, changed_at };
|
||||
|
||||
let stamped_value = StampedValue { value, changed_at };
|
||||
match map.entry(key) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
assert!(
|
||||
!entry.get().changed_at.is_constant,
|
||||
"modifying `{:?}({:?})`, which was previously marked as constant (old value `{:?}`, new value `{:?}`)",
|
||||
Q::default(),
|
||||
entry.key(),
|
||||
entry.get().value,
|
||||
stamped_value.value,
|
||||
);
|
||||
|
||||
match map.entry(key) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
assert!(
|
||||
!entry.get().changed_at.is_constant,
|
||||
"modifying `{:?}({:?})`, which was previously marked as constant (old value `{:?}`, new value `{:?}`)",
|
||||
Q::default(),
|
||||
entry.key(),
|
||||
entry.get().value,
|
||||
stamped_value.value,
|
||||
);
|
||||
entry.insert(stamped_value);
|
||||
}
|
||||
|
||||
entry.insert(stamped_value);
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(stamped_value);
|
||||
}
|
||||
}
|
||||
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(stamped_value);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -90,7 +90,7 @@ where
|
|||
/// case, you can wrap the input with a "no-storage" query and
|
||||
/// invoke this method from time to time.
|
||||
pub fn next_revision(&self) {
|
||||
self.increment_revision();
|
||||
self.with_incremented_revision(|_| ());
|
||||
}
|
||||
|
||||
/// Default implementation for `Database::sweep_all`.
|
||||
|
@ -182,8 +182,16 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
/// Increments the current revision counter and returns the new value.
|
||||
pub(crate) fn increment_revision(&self) -> Revision {
|
||||
/// Acquires the **global query write lock** (ensuring that no
|
||||
/// queries are executing) and then increments the current
|
||||
/// revision counter; invokes `op` with the global query write
|
||||
/// lock still held.
|
||||
///
|
||||
/// While we wait to acquire the global query write lock, this
|
||||
/// method will also increment `pending_revision_increments`, thus
|
||||
/// signalling to queries that their results are "canceled" and
|
||||
/// they should abort as expeditiously as possible.
|
||||
pub(crate) fn with_incremented_revision<R>(&self, op: impl FnOnce(Revision) -> R) -> R {
|
||||
log::debug!("increment_revision()");
|
||||
|
||||
if self.query_in_progress() {
|
||||
|
@ -217,13 +225,12 @@ where
|
|||
let old_revision = self.shared_state.revision.fetch_add(1, Ordering::SeqCst);
|
||||
assert!(old_revision != usize::max_value(), "revision overflow");
|
||||
|
||||
let result = Revision {
|
||||
let new_revision = Revision {
|
||||
generation: 1 + old_revision as u64,
|
||||
};
|
||||
debug!("increment_revision: incremented to {:?}", new_revision);
|
||||
|
||||
debug!("increment_revision: incremented to {:?}", result);
|
||||
|
||||
result
|
||||
op(new_revision)
|
||||
}
|
||||
|
||||
pub(crate) fn query_in_progress(&self) -> bool {
|
||||
|
|
Loading…
Reference in a new issue