From d2c3025009f119d9f7654530d8d1f61956408a3d Mon Sep 17 00:00:00 2001 From: Niko Matsakis Date: Sun, 14 Oct 2018 07:23:38 -0400 Subject: [PATCH] refactor `probe` to be more generic --- src/derived.rs | 98 +++++++++++++++++++++++++++++++++++++------------- 1 file changed, 73 insertions(+), 25 deletions(-) diff --git a/src/derived.rs b/src/derived.rs index 4845435b..235c7dcd 100644 --- a/src/derived.rs +++ b/src/derived.rs @@ -188,11 +188,12 @@ where } } +/// Return value of `probe` helper. enum ProbeState { - UpToDate(StampedValue), + UpToDate(V), CycleDetected, StaleOrAbsent(G), - BlockedOnOtherThread(RuntimeId), + BlockedOnOtherThread, } impl DerivedStorage @@ -232,22 +233,20 @@ where // executing in parallel. let mut old_value = loop { // Read-lock check. - match self.probe(self.map.read(), runtime, revision_now, descriptor, key) { + match self.read_probe(self.map.read(), runtime, revision_now, descriptor, key) { ProbeState::UpToDate(v) => return Ok(v), ProbeState::CycleDetected => return Err(CycleDetected), - ProbeState::BlockedOnOtherThread(other_id) => { - self.await_other_thread(other_id, key); + ProbeState::BlockedOnOtherThread => { continue; } ProbeState::StaleOrAbsent(_guard) => (), } // Write-lock check: install `InProgress` sentinel if no usable value. - match self.probe(self.map.write(), runtime, revision_now, descriptor, key) { + match self.read_probe(self.map.write(), runtime, revision_now, descriptor, key) { ProbeState::UpToDate(v) => return Ok(v), ProbeState::CycleDetected => return Err(CycleDetected), - ProbeState::BlockedOnOtherThread(other_id) => { - self.await_other_thread(other_id, key); + ProbeState::BlockedOnOtherThread => { continue; } ProbeState::StaleOrAbsent(mut map) => { @@ -336,30 +335,88 @@ where /// /// Otherwise, returns `Err(map)` where `map` is the lock guard /// that was given in as argument. - fn probe( + fn read_probe( &self, map: MapGuard, runtime: &Runtime, revision_now: Revision, descriptor: &DB::QueryDescriptor, key: &Q::Key, - ) -> ProbeState + ) -> ProbeState, MapGuard> + where + MapGuard: Deref>>, + { + self.probe(map, runtime, revision_now, descriptor, key, |memo| { + if let Some(value) = &memo.value { + debug!( + "{:?}({:?}): returning memoized value (changed_at={:?})", + Q::default(), + key, + memo.changed_at, + ); + Some(StampedValue { + value: value.clone(), + changed_at: memo.changed_at, + }) + } else { + None + } + }) + } + + /// Helper: + /// + /// Invoked with the guard `map` of some lock on `self.map` (read + /// or write) as well as details about the key to look up. It will + /// check the map and return a suitable `ProbeState`: + /// + /// - `ProbeState::UpToDate(r)` if the memo is up-to-date, + /// and invoking `with_up_to_date_memo` returned `Some(r)`. + /// - `ProbeState::CycleDetected` if this thread is (directly or + /// indirectly) already computing this value. + /// - `ProbeState::BlockedOnOtherThread` if some other thread + /// (which does not depend on us) was already computing this + /// value; caller should re-acquire the lock and try again. + /// - `ProbeState::StaleOrAbsent` if either (a) there is no memo for this key, + /// (b) the memo has not been verified at the current revision, or + /// (c) `with_up_to_date_memo` returned `None`. + /// + /// Note that in all cases **except** for `StaleOrAbsent`, the lock on + /// `map` will have been released. + fn probe( + &self, + map: MapGuard, + runtime: &Runtime, + revision_now: Revision, + descriptor: &DB::QueryDescriptor, + key: &Q::Key, + with_up_to_date_memo: impl FnOnce(&Memo) -> Option, + ) -> ProbeState where MapGuard: Deref>>, { match map.get(key) { Some(QueryState::InProgress { id, others_waiting }) => { - if *id == runtime.id() { + let other_id = *id; + if other_id == runtime.id() { return ProbeState::CycleDetected; } else { - if !runtime.try_block_on(descriptor, *id) { + if !runtime.try_block_on(descriptor, other_id) { return ProbeState::CycleDetected; } // The reader of this will have to acquire map // lock, we don't need any particular ordering. others_waiting.store(true, Ordering::Relaxed); - return ProbeState::BlockedOnOtherThread(*id); + + // Release our lock on `self.map`, so other thread + // can complete. + std::mem::drop(map); + + // Wait for other thread to overwrite this placeholder. + self.await_other_thread(other_id, key); + + return ProbeState::BlockedOnOtherThread; } } @@ -375,18 +432,9 @@ where // If the value is also memoized, return it. // Otherwise fallback to recomputing the value. if m.verified_at == revision_now { - if let Some(value) = &m.value { - debug!( - "{:?}({:?}): returning memoized value (changed_at={:?})", - Q::default(), - key, - m.changed_at, - ); - return ProbeState::UpToDate(StampedValue { - value: value.clone(), - changed_at: m.changed_at, - }); - }; + if let Some(r) = with_up_to_date_memo(&m) { + return ProbeState::UpToDate(r); + } } }