mirror of
https://github.com/salsa-rs/salsa.git
synced 2025-01-13 00:40:22 +00:00
refactor probe
to be more generic
This commit is contained in:
parent
be08029f8c
commit
d2c3025009
1 changed files with 73 additions and 25 deletions
|
@ -188,11 +188,12 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
/// Return value of `probe` helper.
|
||||
enum ProbeState<V, G> {
|
||||
UpToDate(StampedValue<V>),
|
||||
UpToDate(V),
|
||||
CycleDetected,
|
||||
StaleOrAbsent(G),
|
||||
BlockedOnOtherThread(RuntimeId),
|
||||
BlockedOnOtherThread,
|
||||
}
|
||||
|
||||
impl<DB, Q, MP> DerivedStorage<DB, Q, MP>
|
||||
|
@ -232,22 +233,20 @@ where
|
|||
// executing in parallel.
|
||||
let mut old_value = loop {
|
||||
// Read-lock check.
|
||||
match self.probe(self.map.read(), runtime, revision_now, descriptor, key) {
|
||||
match self.read_probe(self.map.read(), runtime, revision_now, descriptor, key) {
|
||||
ProbeState::UpToDate(v) => return Ok(v),
|
||||
ProbeState::CycleDetected => return Err(CycleDetected),
|
||||
ProbeState::BlockedOnOtherThread(other_id) => {
|
||||
self.await_other_thread(other_id, key);
|
||||
ProbeState::BlockedOnOtherThread => {
|
||||
continue;
|
||||
}
|
||||
ProbeState::StaleOrAbsent(_guard) => (),
|
||||
}
|
||||
|
||||
// Write-lock check: install `InProgress` sentinel if no usable value.
|
||||
match self.probe(self.map.write(), runtime, revision_now, descriptor, key) {
|
||||
match self.read_probe(self.map.write(), runtime, revision_now, descriptor, key) {
|
||||
ProbeState::UpToDate(v) => return Ok(v),
|
||||
ProbeState::CycleDetected => return Err(CycleDetected),
|
||||
ProbeState::BlockedOnOtherThread(other_id) => {
|
||||
self.await_other_thread(other_id, key);
|
||||
ProbeState::BlockedOnOtherThread => {
|
||||
continue;
|
||||
}
|
||||
ProbeState::StaleOrAbsent(mut map) => {
|
||||
|
@ -336,30 +335,88 @@ where
|
|||
///
|
||||
/// Otherwise, returns `Err(map)` where `map` is the lock guard
|
||||
/// that was given in as argument.
|
||||
fn probe<MapGuard>(
|
||||
fn read_probe<MapGuard>(
|
||||
&self,
|
||||
map: MapGuard,
|
||||
runtime: &Runtime<DB>,
|
||||
revision_now: Revision,
|
||||
descriptor: &DB::QueryDescriptor,
|
||||
key: &Q::Key,
|
||||
) -> ProbeState<Q::Value, MapGuard>
|
||||
) -> ProbeState<StampedValue<Q::Value>, MapGuard>
|
||||
where
|
||||
MapGuard: Deref<Target = FxHashMap<Q::Key, QueryState<DB, Q>>>,
|
||||
{
|
||||
self.probe(map, runtime, revision_now, descriptor, key, |memo| {
|
||||
if let Some(value) = &memo.value {
|
||||
debug!(
|
||||
"{:?}({:?}): returning memoized value (changed_at={:?})",
|
||||
Q::default(),
|
||||
key,
|
||||
memo.changed_at,
|
||||
);
|
||||
Some(StampedValue {
|
||||
value: value.clone(),
|
||||
changed_at: memo.changed_at,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Helper:
|
||||
///
|
||||
/// Invoked with the guard `map` of some lock on `self.map` (read
|
||||
/// or write) as well as details about the key to look up. It will
|
||||
/// check the map and return a suitable `ProbeState`:
|
||||
///
|
||||
/// - `ProbeState::UpToDate(r)` if the memo is up-to-date,
|
||||
/// and invoking `with_up_to_date_memo` returned `Some(r)`.
|
||||
/// - `ProbeState::CycleDetected` if this thread is (directly or
|
||||
/// indirectly) already computing this value.
|
||||
/// - `ProbeState::BlockedOnOtherThread` if some other thread
|
||||
/// (which does not depend on us) was already computing this
|
||||
/// value; caller should re-acquire the lock and try again.
|
||||
/// - `ProbeState::StaleOrAbsent` if either (a) there is no memo for this key,
|
||||
/// (b) the memo has not been verified at the current revision, or
|
||||
/// (c) `with_up_to_date_memo` returned `None`.
|
||||
///
|
||||
/// Note that in all cases **except** for `StaleOrAbsent`, the lock on
|
||||
/// `map` will have been released.
|
||||
fn probe<MapGuard, R>(
|
||||
&self,
|
||||
map: MapGuard,
|
||||
runtime: &Runtime<DB>,
|
||||
revision_now: Revision,
|
||||
descriptor: &DB::QueryDescriptor,
|
||||
key: &Q::Key,
|
||||
with_up_to_date_memo: impl FnOnce(&Memo<DB, Q>) -> Option<R>,
|
||||
) -> ProbeState<R, MapGuard>
|
||||
where
|
||||
MapGuard: Deref<Target = FxHashMap<Q::Key, QueryState<DB, Q>>>,
|
||||
{
|
||||
match map.get(key) {
|
||||
Some(QueryState::InProgress { id, others_waiting }) => {
|
||||
if *id == runtime.id() {
|
||||
let other_id = *id;
|
||||
if other_id == runtime.id() {
|
||||
return ProbeState::CycleDetected;
|
||||
} else {
|
||||
if !runtime.try_block_on(descriptor, *id) {
|
||||
if !runtime.try_block_on(descriptor, other_id) {
|
||||
return ProbeState::CycleDetected;
|
||||
}
|
||||
|
||||
// The reader of this will have to acquire map
|
||||
// lock, we don't need any particular ordering.
|
||||
others_waiting.store(true, Ordering::Relaxed);
|
||||
return ProbeState::BlockedOnOtherThread(*id);
|
||||
|
||||
// Release our lock on `self.map`, so other thread
|
||||
// can complete.
|
||||
std::mem::drop(map);
|
||||
|
||||
// Wait for other thread to overwrite this placeholder.
|
||||
self.await_other_thread(other_id, key);
|
||||
|
||||
return ProbeState::BlockedOnOtherThread;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -375,18 +432,9 @@ where
|
|||
// If the value is also memoized, return it.
|
||||
// Otherwise fallback to recomputing the value.
|
||||
if m.verified_at == revision_now {
|
||||
if let Some(value) = &m.value {
|
||||
debug!(
|
||||
"{:?}({:?}): returning memoized value (changed_at={:?})",
|
||||
Q::default(),
|
||||
key,
|
||||
m.changed_at,
|
||||
);
|
||||
return ProbeState::UpToDate(StampedValue {
|
||||
value: value.clone(),
|
||||
changed_at: m.changed_at,
|
||||
});
|
||||
};
|
||||
if let Some(r) = with_up_to_date_memo(&m) {
|
||||
return ProbeState::UpToDate(r);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue