treat constants more uniformly

We used to ignore constant inputs entirely. We now track them, but if we
find that a value is constant, we discard ITS inputs.  This means that
-- if we track dependencies -- we have an "outer rim" of constant
values.

Also take the opportunity to reshuffle how derived inputs represent
their state.
This commit is contained in:
Niko Matsakis 2018-10-22 09:52:49 -04:00
parent c4d93f9733
commit d429926ddd
4 changed files with 312 additions and 261 deletions

View file

@ -4,13 +4,13 @@ use crate::plumbing::QueryFunction;
use crate::plumbing::QueryStorageOps;
use crate::plumbing::UncheckedMutQueryStorageOps;
use crate::runtime::ChangedAt;
use crate::runtime::QueryDescriptorSet;
use crate::runtime::FxIndexSet;
use crate::runtime::Revision;
use crate::runtime::Runtime;
use crate::runtime::RuntimeId;
use crate::runtime::StampedValue;
use crate::Database;
use log::debug;
use log::{debug, info};
use parking_lot::Mutex;
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use rustc_hash::FxHashMap;
@ -18,6 +18,7 @@ use smallvec::SmallVec;
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::Arc;
/// Memoized queries store the result plus a list of the other queries
/// that they invoked. This means we can avoid recomputing them when
@ -157,22 +158,56 @@ where
Q: QueryFunction<DB>,
DB: Database,
{
/// Last time the value has actually changed.
/// changed_at can be less than verified_at.
changed_at: ChangedAt,
/// The result of the query, if we decide to memoize it.
value: Option<Q::Value>,
/// The inputs that went into our query, if we are tracking them.
inputs: QueryDescriptorSet<DB>,
/// Last time that we checked our inputs to see if they have
/// changed. If this is equal to the current revision, then the
/// value is up to date. If not, we need to check our inputs and
/// see if any of them have changed since our last check -- if so,
/// we'll need to re-execute.
/// Last revision when this memo was verified (if there are
/// untracked inputs, this will also be when the memo was
/// created).
verified_at: Revision,
/// Last revision when the memoized value was observed to change.
changed_at: Revision,
/// The inputs that went into our query, if we are tracking them.
inputs: MemoInputs<DB>,
}
/// An insertion-order-preserving set of queries. Used to track the
/// inputs accessed during query execution.
pub(crate) enum MemoInputs<DB: Database> {
// No inputs
Constant,
// Non-empty set of inputs fully known
Tracked {
inputs: Arc<FxIndexSet<DB::QueryDescriptor>>,
},
// Unknown quantity of inputs
Untracked,
}
impl<DB: Database> MemoInputs<DB> {
fn is_constant(&self) -> bool {
if let MemoInputs::Constant = self {
true
} else {
false
}
}
}
impl<DB: Database> std::fmt::Debug for MemoInputs<DB> {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
MemoInputs::Constant => fmt.debug_struct("Constant").finish(),
MemoInputs::Tracked { inputs } => {
fmt.debug_struct("Tracked").field("inputs", inputs).finish()
}
MemoInputs::Untracked => fmt.debug_struct("Untracked").finish(),
}
}
}
impl<DB, Q, MP> Default for DerivedStorage<DB, Q, MP>
@ -213,7 +248,7 @@ where
let revision_now = runtime.current_revision();
debug!(
info!(
"{:?}({:?}): invoked at {:?}",
Q::default(),
key,
@ -270,31 +305,29 @@ where
// first things first, let's walk over each of our previous
// inputs and check whether they are out of date.
if let Some(memo) = &mut old_memo {
if let Some(value) = memo.verify_memoized_value(db) {
debug!("{:?}({:?}): inputs still valid", Q::default(), key);
// If none of out inputs have changed since the last time we refreshed
// our value, then our value must still be good. We'll just patch
// the verified-at date and re-use it.
memo.verified_at = revision_now;
let changed_at = memo.changed_at;
if let Some(value) = memo.validate_memoized_value(db, revision_now) {
info!(
"{:?}({:?}): validated old memoized value",
Q::default(),
key
);
let new_value = StampedValue { value, changed_at };
self.overwrite_placeholder(
runtime,
descriptor,
key,
old_memo.unwrap(),
&new_value,
&value,
panic_guard,
);
return Ok(new_value);
return Ok(value);
}
}
// Query was not previously executed, or value is potentially
// stale, or value is absent. Let's execute!
let (mut stamped_value, inputs) = runtime.execute_query_implementation(descriptor, || {
debug!("{:?}({:?}): executing query", Q::default(), key);
let mut result = runtime.execute_query_implementation(descriptor, || {
info!("{:?}({:?}): executing query", Q::default(), key);
if !self.should_track_inputs(key) {
runtime.report_untracked_read();
@ -318,35 +351,63 @@ where
// old value.
if let Some(old_memo) = &old_memo {
if let Some(old_value) = &old_memo.value {
if MP::memoized_value_eq(&old_value, &stamped_value.value) {
assert!(old_memo.changed_at.revision <= stamped_value.changed_at.revision);
stamped_value.changed_at.revision = old_memo.changed_at.revision;
if MP::memoized_value_eq(&old_value, &result.value) {
assert!(old_memo.changed_at <= result.changed_at.revision);
result.changed_at.revision = old_memo.changed_at;
}
}
}
let new_value = StampedValue {
value: result.value,
changed_at: result.changed_at,
};
{
let value = if self.should_memoize_value(key) {
Some(stamped_value.value.clone())
Some(new_value.value.clone())
} else {
None
};
let inputs = match result.subqueries {
None => MemoInputs::Untracked,
Some(descriptors) => {
// If all things that we read were constants, then
// we don't need to track our inputs: our value
// can never be invalidated.
//
// If OTOH we read at least *some* non-constant
// inputs, then we do track our inputs (even the
// constants), so that if we run the GC, we know
// which constants we looked at.
if descriptors.is_empty() || result.changed_at.is_constant {
MemoInputs::Constant
} else {
MemoInputs::Tracked {
inputs: Arc::new(descriptors),
}
}
}
};
self.overwrite_placeholder(
runtime,
descriptor,
key,
Memo {
changed_at: stamped_value.changed_at,
value,
inputs,
changed_at: result.changed_at.revision,
verified_at: revision_now,
inputs,
},
&stamped_value,
&new_value,
panic_guard,
);
}
Ok(stamped_value)
Ok(new_value)
}
/// Helper for `read`:
@ -400,29 +461,17 @@ where
}
Some(QueryState::Memoized(memo)) => {
debug!(
"{:?}({:?}): found memoized value verified_at={:?}",
Q::default(),
key,
memo.verified_at,
);
debug!("{:?}({:?}): found memoized value", Q::default(), key);
// We've found that the query is definitely up-to-date.
// If the value is also memoized, return it.
// Otherwise fallback to recomputing the value.
if memo.verified_at == revision_now {
if let Some(value) = &memo.value {
debug!(
"{:?}({:?}): returning memoized value (changed_at={:?})",
Q::default(),
key,
memo.changed_at,
);
return ProbeState::UpToDate(Ok(StampedValue {
value: value.clone(),
changed_at: memo.changed_at,
}));
}
if let Some(value) = memo.probe_memoized_value(revision_now) {
info!(
"{:?}({:?}): returning memoized value changed at {:?}",
Q::default(),
key,
value.changed_at
);
return ProbeState::UpToDate(Ok(value));
}
}
@ -589,85 +638,95 @@ where
revision_now,
);
let descriptors = {
let map = self.map.read();
match map.get(key) {
// If somebody depends on us, but we have no map
// entry, that must mean that it was found to be out
// of date and removed.
None => return true,
// Acquire read lock to start. In some of the arms below, we
// drop this explicitly.
let map = self.map.read();
// This value is being actively recomputed. Wait for
// that thread to finish (assuming it's not dependent
// on us...) and check its associated revision.
Some(QueryState::InProgress { id, waiting }) => {
let other_id = *id;
return match self
.register_with_in_progress_thread(runtime, descriptor, other_id, waiting)
{
Ok(rx) => {
// Release our lock on `self.map`, so other thread
// can complete.
std::mem::drop(map);
// Look for a memoized value.
let memo = match map.get(key) {
// If somebody depends on us, but we have no map
// entry, that must mean that it was found to be out
// of date and removed.
None => return true,
let value = rx.recv().unwrap();
return value.changed_at.changed_since(revision);
}
// This value is being actively recomputed. Wait for
// that thread to finish (assuming it's not dependent
// on us...) and check its associated revision.
Some(QueryState::InProgress { id, waiting }) => {
let other_id = *id;
match self.register_with_in_progress_thread(runtime, descriptor, other_id, waiting)
{
Ok(rx) => {
// Release our lock on `self.map`, so other thread
// can complete.
std::mem::drop(map);
// Consider a cycle to have changed.
let value = rx.recv().unwrap();
return value.changed_at.changed_since(revision);
}
// Consider a cycle to have changed.
Err(CycleDetected) => return true,
}
}
Some(QueryState::Memoized(memo)) => memo,
};
if memo.verified_at == revision_now {
return memo.changed_at > revision;
}
let inputs = match &memo.inputs {
MemoInputs::Untracked => {
// we don't know the full set of
// inputs, so if there is a new
// revision, we must assume it is
// dirty
return true;
}
MemoInputs::Constant => None,
MemoInputs::Tracked { inputs } => {
// At this point, the value may be dirty (we have
// to check the descriptors). If we have a cached
// value, we'll just fall back to invoking `read`,
// which will do that checking (and a bit more) --
// note that we skip the "pure read" part as we
// already know the result.
assert!(inputs.len() > 0);
if memo.value.is_some() {
std::mem::drop(map);
return match self.read_upgrade(db, key, descriptor, revision_now) {
Ok(v) => v.changed_at.changed_since(revision),
Err(CycleDetected) => true,
};
}
Some(QueryState::Memoized(memo)) => {
// If our memo is still up to date, then check if we've
// changed since the revision.
if memo.verified_at == revision_now {
return memo.changed_at.changed_since(revision);
}
// As a special case, if we have no inputs, we are
// always clean. No need to update `verified_at`.
if let QueryDescriptorSet::Constant = memo.inputs {
return false;
}
// At this point, the value may be dirty (we have
// to check the descriptors). If we have a cached
// value, we'll just fall back to invoking `read`,
// which will do that checking (and a bit more) --
// note that we skip the "pure read" part as we
// already know the result.
if memo.value.is_some() {
drop(map);
return match self.read_upgrade(db, key, descriptor, revision_now) {
Ok(v) => v.changed_at.changed_since(revision),
Err(CycleDetected) => true,
};
}
// If there are no inputs or we don't know the
// inputs, we can answer right away.
match &memo.inputs {
QueryDescriptorSet::Constant => return false,
QueryDescriptorSet::Untracked => return true,
QueryDescriptorSet::Tracked { descriptors } => descriptors.clone(),
}
}
Some(inputs.clone())
}
};
let maybe_changed = descriptors
// We have a **tracked set of inputs**
// (found in `descriptors`) that need to
// be validated.
std::mem::drop(map);
// Iterate the inputs and see if any have maybe changed.
let maybe_changed = inputs
.iter()
.filter(|descriptor| descriptor.maybe_changed_since(db, revision))
.inspect(|old_input| {
.flat_map(|inputs| inputs.iter())
.filter(|input| input.maybe_changed_since(db, revision))
.inspect(|input| {
debug!(
"{:?}({:?}): input `{:?}` may have changed",
Q::default(),
key,
old_input
input
)
}).next()
})
.next()
.is_some();
// Either way, we have to update our entry.
@ -700,7 +759,7 @@ where
match map_read.get(key) {
None => false,
Some(QueryState::InProgress { .. }) => panic!("query in progress"),
Some(QueryState::Memoized(memo)) => memo.changed_at.is_constant(),
Some(QueryState::Memoized(memo)) => memo.inputs.is_constant(),
}
}
}
@ -717,18 +776,16 @@ where
let mut map_write = self.map.write();
let current_revision = db.salsa_runtime().current_revision();
let changed_at = ChangedAt {
is_constant: false,
revision: current_revision,
};
map_write.insert(
key,
QueryState::Memoized(Memo {
value: Some(value),
changed_at,
inputs: QueryDescriptorSet::default(),
changed_at: current_revision,
verified_at: current_revision,
inputs: MemoInputs::Tracked {
inputs: Default::default(),
},
}),
);
}
@ -739,53 +796,90 @@ where
Q: QueryFunction<DB>,
DB: Database,
{
fn verify_memoized_value(&self, db: &DB) -> Option<Q::Value> {
fn validate_memoized_value(
&mut self,
db: &DB,
revision_now: Revision,
) -> Option<StampedValue<Q::Value>> {
// If we don't have a memoized value, nothing to validate.
if let Some(v) = &self.value {
// If inputs are still valid.
if self.verify_inputs(db) {
return Some(v.clone());
let value = self.value.as_ref()?;
assert!(self.verified_at != revision_now);
let verified_at = self.verified_at;
let is_constant = match &mut self.inputs {
// We can't validate values that had untracked inputs; just have to
// re-execute.
MemoInputs::Untracked { .. } => {
return None;
}
// Constant: no changed input
MemoInputs::Constant => true,
// Check whether any of our inputs changed since the
// **last point where we were verified** (not since we
// last changed). This is important: if we have
// memoized values, then an input may have changed in
// revision R2, but we found that *our* value was the
// same regardless, so our change date is still
// R1. But our *verification* date will be R2, and we
// are only interested in finding out whether the
// input changed *again*.
MemoInputs::Tracked { inputs } => {
let changed_input = inputs
.iter()
.filter(|input| input.maybe_changed_since(db, verified_at))
.next();
if let Some(input) = changed_input {
debug!(
"{:?}::validate_memoized_value: `{:?}` may have changed",
Q::default(),
input
);
return None;
}
false
}
};
self.verified_at = revision_now;
Some(StampedValue {
changed_at: ChangedAt {
is_constant,
revision: self.changed_at,
},
value: value.clone(),
})
}
/// Returns the memoized value *if* it is known to be update in the given revision.
fn probe_memoized_value(&self, revision_now: Revision) -> Option<StampedValue<Q::Value>> {
let value = self.value.as_ref()?;
debug!(
"probe_memoized_value(verified_at={:?}, changed_at={:?})",
self.verified_at, self.changed_at,
);
if self.verified_at == revision_now {
let is_constant = match self.inputs {
MemoInputs::Constant => true,
_ => false,
};
return Some(StampedValue {
changed_at: ChangedAt {
is_constant,
revision: self.changed_at,
},
value: value.clone(),
});
}
None
}
fn verify_inputs(&self, db: &DB) -> bool {
match &self.inputs {
QueryDescriptorSet::Constant => {
debug_assert!(self.changed_at.is_constant);
true
}
QueryDescriptorSet::Tracked { descriptors } => {
debug_assert!(!descriptors.is_empty());
debug_assert!(!self.changed_at.is_constant);
// Check whether any of our inputs changed since the
// **last point where we were verified** (not since we
// last changed). This is important: if we have
// memoized values, then an input may have changed in
// revision R2, but we found that *our* value was the
// same regardless, so our change date is still
// R1. But our *verification* date will be R2, and we
// are only interested in finding out whether the
// input changed *again*.
let changed_input = descriptors
.iter()
.filter(|old_input| old_input.maybe_changed_since(db, self.verified_at))
.inspect(|old_input| {
debug!(
"{:?}::verify_descriptors: `{:?}` may have changed",
Q::default(),
old_input
)
}).next();
changed_input.is_none()
}
QueryDescriptorSet::Untracked => false,
}
}
}

View file

@ -80,13 +80,10 @@ where
// still intact, they just have conservative
// dependencies. The next revision, they may wind up
// with something more precise.
if is_constant.0 && !old_value.changed_at.is_constant() {
if is_constant.0 && !old_value.changed_at.is_constant {
let mut map = RwLockUpgradableReadGuard::upgrade(map);
let old_value = map.get_mut(key).unwrap();
old_value.changed_at = ChangedAt {
is_constant: true,
revision: db.salsa_runtime().current_revision(),
};
old_value.changed_at.is_constant = true;
}
return;
@ -121,7 +118,7 @@ where
match map.entry(key) {
Entry::Occupied(mut entry) => {
assert!(
!entry.get().changed_at.is_constant(),
!entry.get().changed_at.is_constant,
"modifying `{:?}({:?})`, which was previously marked as constant (old value `{:?}`, new value `{:?}`)",
Q::default(),
entry.key(),
@ -198,7 +195,7 @@ where
let map_read = self.map.read();
map_read
.get(key)
.map(|v| v.changed_at.is_constant())
.map(|v| v.changed_at.is_constant)
.unwrap_or(false)
}
}

View file

@ -10,7 +10,7 @@ use std::hash::BuildHasherDefault;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
type FxIndexSet<K> = indexmap::IndexSet<K, BuildHasherDefault<FxHasher>>;
pub(crate) type FxIndexSet<K> = indexmap::IndexSet<K, BuildHasherDefault<FxHasher>>;
/// The salsa runtime stores the storage for all queries as well as
/// tracking the query stack and dependencies between cycles.
@ -229,7 +229,7 @@ where
&self,
descriptor: &DB::QueryDescriptor,
execute: impl FnOnce() -> V,
) -> (StampedValue<V>, QueryDescriptorSet<DB>) {
) -> ComputedQueryResult<DB, V> {
debug!("{:?}: execute_query_implementation invoked", descriptor);
// Push the active query onto the stack.
@ -258,34 +258,11 @@ where
local_state.query_stack.pop().unwrap()
};
let is_constant = match &subqueries {
Some(set) => set.is_empty(),
None => false,
};
let query_descriptor_set = match subqueries {
None => QueryDescriptorSet::Untracked,
Some(set) => {
if set.is_empty() {
QueryDescriptorSet::Constant
} else {
QueryDescriptorSet::Tracked {
descriptors: Arc::new(set),
}
}
}
};
(
StampedValue {
value,
changed_at: ChangedAt {
is_constant,
revision: changed_at,
},
},
query_descriptor_set,
)
ComputedQueryResult {
value,
changed_at,
subqueries,
}
}
/// Reports that the currently active query read the result from
@ -474,18 +451,41 @@ struct ActiveQuery<DB: Database> {
/// What query is executing
descriptor: DB::QueryDescriptor,
/// Records the maximum revision where any subquery changed
changed_at: Revision,
/// Maximum revision of all inputs thus far;
/// we also track if all inputs have been constant.
///
/// If we see an untracked input, this is not terribly relevant.
changed_at: ChangedAt,
/// Each subquery
/// Set of subqueries that were accessed thus far, or `None` if
/// there was an untracked the read.
subqueries: Option<FxIndexSet<DB::QueryDescriptor>>,
}
pub(crate) struct ComputedQueryResult<DB: Database, V> {
/// Final value produced
pub(crate) value: V,
/// Maximum revision of all inputs observed; `is_constant` is true
/// if all inputs were constants.
///
/// If we observe an untracked read, this will be set to a
/// non-constant value that changed in the most recent revision.
pub(crate) changed_at: ChangedAt,
/// Complete set of subqueries that were accessed, or `None` if
/// there was an untracked the read.
pub(crate) subqueries: Option<FxIndexSet<DB::QueryDescriptor>>,
}
impl<DB: Database> ActiveQuery<DB> {
fn new(descriptor: DB::QueryDescriptor) -> Self {
ActiveQuery {
descriptor,
changed_at: Revision::ZERO,
changed_at: ChangedAt {
is_constant: true,
revision: Revision::ZERO,
},
subqueries: Some(FxIndexSet::default()),
}
}
@ -496,20 +496,18 @@ impl<DB: Database> ActiveQuery<DB> {
revision,
} = changed_at;
if is_constant {
// When we read constant values, we don't need to
// track the source of the value.
} else {
if let Some(set) = &mut self.subqueries {
set.insert(subquery.clone());
}
self.changed_at = self.changed_at.max(revision);
if let Some(set) = &mut self.subqueries {
set.insert(subquery.clone());
}
self.changed_at.is_constant &= is_constant;
self.changed_at.revision = self.changed_at.revision.max(revision);
}
fn add_untracked_read(&mut self, changed_at: Revision) {
self.subqueries = None;
self.changed_at = self.changed_at.max(changed_at);
self.changed_at.is_constant = false;
self.changed_at.revision = changed_at;
}
}
@ -546,52 +544,14 @@ pub struct ChangedAt {
}
impl ChangedAt {
pub fn is_constant(self) -> bool {
self.is_constant
}
/// True if a value is stored with this `ChangedAt` value has
/// changed after `revision`. This is invoked by query storage
/// when their dependents are asking them if they have changed.
pub fn changed_since(self, revision: Revision) -> bool {
pub(crate) fn changed_since(self, revision: Revision) -> bool {
self.revision > revision
}
}
/// An insertion-order-preserving set of queries. Used to track the
/// inputs accessed during query execution.
pub(crate) enum QueryDescriptorSet<DB: Database> {
/// No inputs:
Constant,
/// All reads were to tracked things:
Tracked {
descriptors: Arc<FxIndexSet<DB::QueryDescriptor>>,
},
/// Some reads to an untracked thing:
Untracked,
}
impl<DB: Database> std::fmt::Debug for QueryDescriptorSet<DB> {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
QueryDescriptorSet::Constant => fmt.debug_struct("Constant").finish(),
QueryDescriptorSet::Tracked { descriptors } => fmt
.debug_struct("Tracked")
.field("descriptors", descriptors)
.finish(),
QueryDescriptorSet::Untracked => fmt.debug_struct("Untracked").finish(),
}
}
}
impl<DB: Database> Default for QueryDescriptorSet<DB> {
fn default() -> Self {
QueryDescriptorSet::Constant
}
}
#[derive(Clone, Debug)]
pub(crate) struct StampedValue<V> {
pub(crate) value: V,

View file

@ -68,7 +68,7 @@ fn revalidate() {
query.salsa_runtime().next_revision();
query.memoized2();
query.assert_log(&["Volatile invoked", "Memoized1 invoked"]);
query.assert_log(&["Memoized1 invoked", "Volatile invoked"]);
query.memoized2();
query.assert_log(&[]);
@ -79,7 +79,7 @@ fn revalidate() {
query.salsa_runtime().next_revision();
query.memoized2();
query.assert_log(&["Volatile invoked", "Memoized1 invoked", "Memoized2 invoked"]);
query.assert_log(&["Memoized1 invoked", "Volatile invoked", "Memoized2 invoked"]);
query.memoized2();
query.assert_log(&[]);