mirror of
https://github.com/salsa-rs/salsa.git
synced 2025-01-29 15:49:13 +00:00
midpoint: make runtime take wait-result
Thi sis an intermediate step towards having the runtime coordinate wakeups.
This commit is contained in:
parent
21cb4ef9d6
commit
3c094d2932
3 changed files with 60 additions and 25 deletions
|
@ -774,30 +774,22 @@ where
|
|||
QueryState::InProgress { id, waiting } => {
|
||||
assert_eq!(id, self.runtime.id());
|
||||
|
||||
let opt_wait_result = new_value.map(|(new_value, ref cycle)| WaitResult {
|
||||
value: StampedValue {
|
||||
value: (),
|
||||
durability: new_value.durability,
|
||||
changed_at: new_value.changed_at,
|
||||
},
|
||||
cycle: cycle.clone(),
|
||||
});
|
||||
|
||||
self.runtime
|
||||
.unblock_queries_blocked_on_self(self.database_key_index);
|
||||
.unblock_queries_blocked_on(self.database_key_index, opt_wait_result.clone());
|
||||
|
||||
match new_value {
|
||||
// If anybody has installed themselves in our "waiting"
|
||||
// list, notify them that the value is available.
|
||||
Some((new_value, ref cycle)) => {
|
||||
for promise in waiting.into_inner() {
|
||||
promise.fulfill(WaitResult {
|
||||
value: StampedValue {
|
||||
value: (),
|
||||
durability: new_value.durability,
|
||||
changed_at: new_value.changed_at,
|
||||
},
|
||||
cycle: cycle.clone(),
|
||||
});
|
||||
}
|
||||
if let Some(wait_result) = opt_wait_result {
|
||||
for w in waiting.into_inner() {
|
||||
w.fulfill(wait_result.clone());
|
||||
}
|
||||
|
||||
// We have no value to send when we are panicking.
|
||||
// Therefore, we need to drop the sending half of the
|
||||
// channel so that our panic propagates to those waiting
|
||||
// on the receiving half.
|
||||
None => std::mem::drop(waiting),
|
||||
}
|
||||
}
|
||||
_ => panic!(
|
||||
|
|
|
@ -42,7 +42,7 @@ pub struct Runtime {
|
|||
shared_state: Arc<SharedState>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct WaitResult {
|
||||
pub(crate) value: StampedValue<()>,
|
||||
pub(crate) cycle: Vec<DatabaseKeyIndex>,
|
||||
|
@ -443,11 +443,20 @@ impl Runtime {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) fn unblock_queries_blocked_on_self(&self, database_key_index: DatabaseKeyIndex) {
|
||||
/// Invoked when this runtime completed computing `database_key` with
|
||||
/// the given result `wait_result` (`wait_result` should be `None` if
|
||||
/// computing `database_key` panicked and could not complete).
|
||||
/// This function unblocks any dependent queries and allows them
|
||||
/// to continue executing.
|
||||
pub(crate) fn unblock_queries_blocked_on(
|
||||
&self,
|
||||
database_key: DatabaseKeyIndex,
|
||||
wait_result: Option<WaitResult>,
|
||||
) {
|
||||
self.shared_state
|
||||
.dependency_graph
|
||||
.lock()
|
||||
.remove_edge(database_key_index, self.id())
|
||||
.unblock_dependents_of(database_key, self.id(), wait_result);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
use crate::runtime::WaitResult;
|
||||
use crate::{DatabaseKeyIndex, RuntimeId};
|
||||
use parking_lot::MutexGuard;
|
||||
use rustc_hash::FxHashMap;
|
||||
|
@ -14,6 +15,11 @@ pub(super) struct DependencyGraph {
|
|||
/// Encodes the `RuntimeId` that are blocked waiting for the result
|
||||
/// of a given query.
|
||||
query_dependents: FxHashMap<DatabaseKeyIndex, SmallVec<[RuntimeId; 4]>>,
|
||||
|
||||
/// When a key K completes which had dependent queries Qs blocked on it,
|
||||
/// it stores its `WaitResult` here. As they wake up, each query Q in Qs will
|
||||
/// come here to fetch their results.
|
||||
wait_resuilts: FxHashMap<RuntimeId, WaitResult>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -88,7 +94,33 @@ impl DependencyGraph {
|
|||
.push(from_id);
|
||||
}
|
||||
|
||||
pub(super) fn remove_edge(&mut self, database_key: DatabaseKeyIndex, to_id: RuntimeId) {
|
||||
/// Invoked when runtime `to_id` completes executing
|
||||
/// `database_key`.
|
||||
pub(super) fn unblock_dependents_of(
|
||||
&mut self,
|
||||
database_key: DatabaseKeyIndex,
|
||||
to_id: RuntimeId,
|
||||
wait_result: Option<WaitResult>,
|
||||
) {
|
||||
let dependents = self.remove_edge(database_key, to_id);
|
||||
|
||||
// FIXME: Not ready for this yet
|
||||
//
|
||||
// for d in dependents {
|
||||
// self.wait_resuilts.insert(d, wait_result.clone());
|
||||
// }
|
||||
drop(dependents);
|
||||
}
|
||||
|
||||
/// Remove all dependency edges into `database_key`
|
||||
/// (being computed by `to_id`) and return the list of
|
||||
/// dependent runtimes that were waiting for `database_key`
|
||||
/// to complete.
|
||||
fn remove_edge(
|
||||
&mut self,
|
||||
database_key: DatabaseKeyIndex,
|
||||
to_id: RuntimeId,
|
||||
) -> impl IntoIterator<Item = RuntimeId> {
|
||||
let vec = self
|
||||
.query_dependents
|
||||
.remove(&database_key)
|
||||
|
@ -98,6 +130,8 @@ impl DependencyGraph {
|
|||
let to_id1 = self.edges.remove(from_id).map(|edge| edge.id);
|
||||
assert_eq!(Some(to_id), to_id1);
|
||||
}
|
||||
|
||||
vec
|
||||
}
|
||||
|
||||
pub(super) fn push_cycle_path(
|
||||
|
|
Loading…
Reference in a new issue