mirror of
https://github.com/salsa-rs/salsa.git
synced 2025-01-27 15:07:03 +00:00
tracking pending increments and add is_current_revision_canceled
This commit is contained in:
parent
4a8b264b7f
commit
41b36da054
1 changed files with 59 additions and 3 deletions
|
@ -2,12 +2,12 @@ use crate::Database;
|
||||||
use crate::Query;
|
use crate::Query;
|
||||||
use crate::QueryFunction;
|
use crate::QueryFunction;
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use parking_lot::RwLock;
|
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
|
||||||
use rustc_hash::FxHasher;
|
use rustc_hash::FxHasher;
|
||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::fmt::Write;
|
use std::fmt::Write;
|
||||||
use std::hash::BuildHasherDefault;
|
use std::hash::BuildHasherDefault;
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
type FxIndexSet<K> = indexmap::IndexSet<K, BuildHasherDefault<FxHasher>>;
|
type FxIndexSet<K> = indexmap::IndexSet<K, BuildHasherDefault<FxHasher>>;
|
||||||
|
@ -34,6 +34,7 @@ where
|
||||||
storage: Default::default(),
|
storage: Default::default(),
|
||||||
revision_lock: Default::default(),
|
revision_lock: Default::default(),
|
||||||
revision: Default::default(),
|
revision: Default::default(),
|
||||||
|
pending_revision_increments: Default::default(),
|
||||||
}),
|
}),
|
||||||
local_state: RefCell::new(LocalState {
|
local_state: RefCell::new(LocalState {
|
||||||
query_stack: Default::default(),
|
query_stack: Default::default(),
|
||||||
|
@ -75,17 +76,65 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check if the current revision is canceled. If this method ever
|
||||||
|
/// returns true, the currently executing query is also marked as
|
||||||
|
/// having an *untracked read* -- this means that, in the next
|
||||||
|
/// revision, we will always recompute its value "as if" some
|
||||||
|
/// input had changed. This means that, if your revision is
|
||||||
|
/// canceled (which indicates that current query results will be
|
||||||
|
/// ignored) your query is free to shortcircuit and return
|
||||||
|
/// whatever it likes.
|
||||||
|
pub fn is_current_revision_canceled(&self) -> bool {
|
||||||
|
let pending_revision_increments = self
|
||||||
|
.shared_state
|
||||||
|
.pending_revision_increments
|
||||||
|
.load(Ordering::SeqCst);
|
||||||
|
if pending_revision_increments > 0 {
|
||||||
|
self.report_untracked_read();
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Increments the current revision counter and returns the new value.
|
/// Increments the current revision counter and returns the new value.
|
||||||
pub(crate) fn increment_revision(&self) -> Revision {
|
pub(crate) fn increment_revision(&self) -> Revision {
|
||||||
if !self.local_state.borrow().query_stack.is_empty() {
|
if !self.local_state.borrow().query_stack.is_empty() {
|
||||||
panic!("increment_revision invoked during a query computation");
|
panic!("increment_revision invoked during a query computation");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get an (upgradable) read lock, so that we are sure nobody
|
||||||
|
// else is changing the current revision.
|
||||||
|
let lock = self.shared_state.revision_lock.upgradable_read();
|
||||||
|
|
||||||
|
// Flag current revision as cancelled.
|
||||||
|
// `increment_revision` calls, they may all set the
|
||||||
|
let old_pending_revision_increments = self
|
||||||
|
.shared_state
|
||||||
|
.pending_revision_increments
|
||||||
|
.fetch_add(1, Ordering::SeqCst);
|
||||||
|
assert!(
|
||||||
|
old_pending_revision_increments != usize::max_value(),
|
||||||
|
"pending increment overflow"
|
||||||
|
);
|
||||||
|
|
||||||
// To modify the revision, we need the lock.
|
// To modify the revision, we need the lock.
|
||||||
let _lock = self.shared_state.revision_lock.write();
|
let _lock = RwLockUpgradableReadGuard::upgrade(lock);
|
||||||
|
|
||||||
|
// *Before* updating the revision number, reset
|
||||||
|
// `revision_cancelled` to false. This way, if anybody should
|
||||||
|
// happen to invoke `is_current_revision_canceled` before we
|
||||||
|
// update the number, they don't get an incorrect result (but
|
||||||
|
// note that, because we hold `revision_lock`, no queries can
|
||||||
|
// be currently executing anyhow, so it's sort of a moot
|
||||||
|
// point).
|
||||||
|
self.shared_state
|
||||||
|
.pending_revision_increments
|
||||||
|
.fetch_sub(1, Ordering::SeqCst);
|
||||||
|
|
||||||
let old_revision = self.shared_state.revision.fetch_add(1, Ordering::SeqCst);
|
let old_revision = self.shared_state.revision.fetch_add(1, Ordering::SeqCst);
|
||||||
assert!(old_revision != usize::max_value(), "revision overflow");
|
assert!(old_revision != usize::max_value(), "revision overflow");
|
||||||
|
|
||||||
let result = Revision {
|
let result = Revision {
|
||||||
generation: 1 + old_revision as u64,
|
generation: 1 + old_revision as u64,
|
||||||
};
|
};
|
||||||
|
@ -191,6 +240,13 @@ struct SharedState<DB: Database> {
|
||||||
///
|
///
|
||||||
/// (Ideally, this should be `AtomicU64`, but that is currently unstable.)
|
/// (Ideally, this should be `AtomicU64`, but that is currently unstable.)
|
||||||
revision: AtomicUsize,
|
revision: AtomicUsize,
|
||||||
|
|
||||||
|
/// Counts the number of pending increments to the revision
|
||||||
|
/// counter. If this is non-zero, it means that the current
|
||||||
|
/// revision is out of date, and hence queries are free to
|
||||||
|
/// "short-circuit" their results if they learn that. See
|
||||||
|
/// `is_current_revision_canceled` for more information.
|
||||||
|
pending_revision_increments: AtomicUsize,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// State that will be specific to a single execution threads (when we
|
/// State that will be specific to a single execution threads (when we
|
||||||
|
|
Loading…
Reference in a new issue