use std::sync::Arc; use parking_lot::Condvar; use crate::cycle::CycleRecoveryStrategy; use crate::ingredient::Ingredient; use crate::jar::Jar; use crate::key::DependencyIndex; use crate::runtime::local_state::QueryOrigin; use crate::runtime::Runtime; use crate::{Database, DatabaseKeyIndex, Id, IngredientIndex}; use super::routes::Routes; use super::{ParallelDatabase, Revision}; /// The "storage" struct stores all the data for the jars. /// It is shared between the main database and any active snapshots. pub struct Storage { /// Data shared across all databases. shared: Arc>, /// The "ingredients" structure stores the information about how to find each ingredient in the database. /// It allows us to take the [`IngredientIndex`] assigned to a particular ingredient /// and get back a [`dyn Ingredient`][`Ingredient`] for the struct that stores its data. routes: Arc>, /// The runtime for this particular salsa database handle. /// Each handle gets its own runtime, but the runtimes have shared state between them.s runtime: Runtime, } /// Data shared between all threads. /// This is where the actual data for tracked functions, structs, inputs, etc lives, /// along with some coordination variables between treads. struct Shared { /// Contains the data for each jar in the database. /// Each jar stores its own structs in there that ultimately contain ingredients /// (types that implement the [`Ingredient`] trait, like [`crate::function::FunctionIngredient`]). jars: DB::Jars, /// Conditional variable that is used to coordinate cancellation. /// When the main thread writes to the database, it blocks until each of the snapshots can be cancelled. cvar: Condvar, } impl Default for Storage where DB: HasJars, { fn default() -> Self { let mut routes = Routes::new(); let jars = DB::create_jars(&mut routes); Self { shared: Arc::new(Shared { jars, cvar: Default::default(), }), routes: Arc::new(routes), runtime: Runtime::default(), } } } impl Storage where DB: HasJars, { pub fn snapshot(&self) -> Storage where DB: ParallelDatabase, { Self { shared: self.shared.clone(), routes: self.routes.clone(), runtime: self.runtime.snapshot(), } } pub fn jars(&self) -> (&DB::Jars, &Runtime) { (&self.shared.jars, &self.runtime) } pub fn runtime(&self) -> &Runtime { &self.runtime } /// Gets mutable access to the jars. This will trigger a new revision /// and it will also cancel any ongoing work in the current revision. /// Any actual writes that occur to data in a jar should use /// [`Runtime::report_tracked_write`]. pub fn jars_mut(&mut self) -> (&mut DB::Jars, &mut Runtime) { self.cancel_other_workers(); self.runtime.new_revision(); let routes = self.routes.clone(); let shared = Arc::get_mut(&mut self.shared).unwrap(); for route in routes.reset_routes() { route(&mut shared.jars).reset_for_new_revision(); } (&mut shared.jars, &mut self.runtime) } /// Sets cancellation flag and blocks until all other workers with access /// to this storage have completed. /// /// This could deadlock if there is a single worker with two handles to the /// same database! fn cancel_other_workers(&mut self) { loop { self.runtime.set_cancellation_flag(); // If we have unique access to the jars, we are done. if Arc::get_mut(&mut self.shared).is_some() { return; } // Otherwise, wait until some other storage entites have dropped. // We create a mutex here because the cvar api requires it, but we // don't really need one as the data being protected is actually // the jars above. let mutex = parking_lot::Mutex::new(()); let mut guard = mutex.lock(); self.shared.cvar.wait(&mut guard); } } pub fn ingredient(&self, ingredient_index: IngredientIndex) -> &dyn Ingredient { let route = self.routes.route(ingredient_index); route(&self.shared.jars) } } impl Drop for Shared where DB: HasJars, { fn drop(&mut self) { self.cvar.notify_all(); } } pub trait HasJars: HasJarsDyn + Sized { type Jars; fn jars(&self) -> (&Self::Jars, &Runtime); /// Gets mutable access to the jars. This will trigger a new revision /// and it will also cancel any ongoing work in the current revision. fn jars_mut(&mut self) -> (&mut Self::Jars, &mut Runtime); fn create_jars(routes: &mut Routes) -> Self::Jars; } pub trait DbWithJar: HasJar + Database { fn as_jar_db<'db>(&'db self) -> &>::DynDb where J: Jar<'db>; } pub trait JarFromJars: HasJars { fn jar_from_jars<'db>(jars: &Self::Jars) -> &J; fn jar_from_jars_mut<'db>(jars: &mut Self::Jars) -> &mut J; } pub trait HasJar { fn jar(&self) -> (&J, &Runtime); fn jar_mut(&mut self) -> (&mut J, &mut Runtime); } // Dyn friendly subset of HasJars pub trait HasJarsDyn { fn runtime(&self) -> &Runtime; fn maybe_changed_after(&self, input: DependencyIndex, revision: Revision) -> bool; fn cycle_recovery_strategy(&self, input: IngredientIndex) -> CycleRecoveryStrategy; fn origin(&self, input: DatabaseKeyIndex) -> Option; fn mark_validated_output(&self, executor: DatabaseKeyIndex, output: DependencyIndex); /// Invoked when `executor` used to output `stale_output` but no longer does. /// This method routes that into a call to the [`remove_stale_output`](`crate::ingredient::Ingredient::remove_stale_output`) /// method on the ingredient for `stale_output`. fn remove_stale_output(&self, executor: DatabaseKeyIndex, stale_output: DependencyIndex); /// Informs `ingredient` that the salsa struct with id `id` has been deleted. /// This means that `id` will not be used in this revision and hence /// any memoized values keyed by that struct can be discarded. /// /// In order to receive this callback, `ingredient` must have registered itself /// as a dependent function using /// [`SalsaStructInDb::register_dependent_fn`](`crate::salsa_struct::SalsaStructInDb::register_dependent_fn`). fn salsa_struct_deleted(&self, ingredient: IngredientIndex, id: Id); } pub trait HasIngredientsFor where I: IngredientsFor, { fn ingredient(&self) -> &I::Ingredients; fn ingredient_mut(&mut self) -> &mut I::Ingredients; } pub trait IngredientsFor { type Jar; type Ingredients; fn create_ingredients(routes: &mut Routes) -> Self::Ingredients where DB: DbWithJar + JarFromJars; }