salsa/tests/parallel/stress.rs

203 lines
5.1 KiB
Rust
Raw Normal View History

2019-07-02 11:40:43 +00:00
use rand::seq::SliceRandom;
2018-10-30 12:09:26 +00:00
use rand::Rng;
2019-07-02 11:40:43 +00:00
2018-10-30 12:09:26 +00:00
use salsa::ParallelDatabase;
2018-11-01 00:06:06 +00:00
use salsa::Snapshot;
2018-10-30 12:09:26 +00:00
use salsa::SweepStrategy;
use salsa::{Cancelled, Database};
2018-10-30 12:09:26 +00:00
// Number of operations a reader performs
const N_MUTATOR_OPS: usize = 100;
const N_READER_OPS: usize = 100;
#[salsa::query_group(Stress)]
trait StressDatabase: salsa::Database {
#[salsa::input]
fn a(&self, key: usize) -> usize;
2018-10-30 12:09:26 +00:00
2021-05-17 16:59:28 +00:00
fn b(&self, key: usize) -> usize;
2018-10-30 12:09:26 +00:00
2021-05-17 16:59:28 +00:00
fn c(&self, key: usize) -> usize;
2018-10-30 12:09:26 +00:00
}
2021-05-17 16:59:28 +00:00
fn b(db: &dyn StressDatabase, key: usize) -> usize {
db.salsa_runtime().unwind_if_cancelled();
2021-05-17 16:59:28 +00:00
db.a(key)
2018-10-30 12:09:26 +00:00
}
2021-05-17 16:59:28 +00:00
fn c(db: &dyn StressDatabase, key: usize) -> usize {
2018-10-30 12:09:26 +00:00
db.b(key)
}
#[salsa::database(Stress)]
2018-10-30 12:09:26 +00:00
#[derive(Default)]
struct StressDatabaseImpl {
2020-07-02 10:31:02 +00:00
storage: salsa::Storage<Self>,
2018-10-30 12:09:26 +00:00
}
2020-07-02 10:31:02 +00:00
impl salsa::Database for StressDatabaseImpl {}
2018-10-30 12:09:26 +00:00
impl salsa::ParallelDatabase for StressDatabaseImpl {
2018-11-01 00:06:06 +00:00
fn snapshot(&self) -> Snapshot<StressDatabaseImpl> {
Snapshot::new(StressDatabaseImpl {
2020-07-02 10:31:02 +00:00
storage: self.storage.snapshot(),
})
2018-10-30 12:09:26 +00:00
}
}
#[derive(Clone, Copy, Debug)]
2018-10-31 10:03:33 +00:00
enum Query {
A,
B,
C,
}
2018-10-30 12:09:26 +00:00
enum MutatorOp {
WriteOp(WriteOp),
LaunchReader {
ops: Vec<ReadOp>,
check_cancellation: bool,
},
}
2018-10-30 12:09:26 +00:00
#[derive(Debug)]
enum WriteOp {
2018-10-30 12:09:26 +00:00
SetA(usize, usize),
}
#[derive(Debug)]
enum ReadOp {
2018-10-30 12:09:26 +00:00
Get(Query, usize),
Gc(Query, SweepStrategy),
GcAll(SweepStrategy),
}
impl rand::distributions::Distribution<Query> for rand::distributions::Standard {
fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> Query {
2019-07-02 11:40:43 +00:00
*[Query::A, Query::B, Query::C].choose(rng).unwrap()
2018-10-30 12:09:26 +00:00
}
}
impl rand::distributions::Distribution<MutatorOp> for rand::distributions::Standard {
fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> MutatorOp {
2018-10-30 12:09:26 +00:00
if rng.gen_bool(0.5) {
MutatorOp::WriteOp(rng.gen())
} else {
MutatorOp::LaunchReader {
ops: (0..N_READER_OPS).map(|_| rng.gen()).collect(),
check_cancellation: rng.gen(),
}
2018-10-30 12:09:26 +00:00
}
}
}
impl rand::distributions::Distribution<WriteOp> for rand::distributions::Standard {
fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> WriteOp {
let key = rng.gen::<usize>() % 10;
let value = rng.gen::<usize>() % 10;
return WriteOp::SetA(key, value);
}
}
impl rand::distributions::Distribution<ReadOp> for rand::distributions::Standard {
fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> ReadOp {
2018-10-30 12:09:26 +00:00
if rng.gen_bool(0.5) {
let query = rng.gen::<Query>();
2018-10-30 12:09:26 +00:00
let key = rng.gen::<usize>() % 10;
return ReadOp::Get(query, key);
2018-10-30 12:09:26 +00:00
}
2019-01-27 14:14:57 +00:00
let mut strategy = SweepStrategy::discard_outdated();
2018-10-30 12:09:26 +00:00
if rng.gen_bool(0.5) {
strategy = strategy.discard_values();
}
if rng.gen_bool(0.5) {
ReadOp::Gc(rng.gen::<Query>(), strategy)
2018-10-30 12:09:26 +00:00
} else {
ReadOp::GcAll(strategy)
2018-10-30 12:09:26 +00:00
}
}
}
fn db_reader_thread(db: &StressDatabaseImpl, ops: Vec<ReadOp>, check_cancellation: bool) {
2018-10-30 12:09:26 +00:00
for op in ops {
if check_cancellation {
db.salsa_runtime().unwind_if_cancelled();
}
op.execute(db);
}
}
impl WriteOp {
fn execute(self, db: &mut StressDatabaseImpl) {
match self {
WriteOp::SetA(key, value) => {
db.set_a(key, value);
2018-10-30 12:09:26 +00:00
}
}
}
}
impl ReadOp {
fn execute(self, db: &StressDatabaseImpl) {
match self {
ReadOp::Get(query, key) => match query {
2018-10-31 10:03:33 +00:00
Query::A => {
db.a(key);
2018-10-30 12:09:26 +00:00
}
2018-10-31 10:03:33 +00:00
Query::B => {
let _ = db.b(key);
2018-10-30 12:09:26 +00:00
}
2018-10-31 10:03:33 +00:00
Query::C => {
let _ = db.c(key);
}
},
ReadOp::Gc(query, strategy) => match query {
2018-10-31 10:03:33 +00:00
Query::A => {
AQuery.in_db(db).sweep(strategy);
2018-10-31 10:03:33 +00:00
}
Query::B => {
BQuery.in_db(db).sweep(strategy);
2018-10-31 10:03:33 +00:00
}
Query::C => {
CQuery.in_db(db).sweep(strategy);
2018-10-31 10:03:33 +00:00
}
},
ReadOp::GcAll(strategy) => {
2018-10-30 12:09:26 +00:00
db.sweep_all(strategy);
}
}
}
}
#[test]
fn stress_test() {
let mut db = StressDatabaseImpl::default();
2018-10-30 12:09:26 +00:00
for i in 0..10 {
db.set_a(i, i);
2018-10-30 12:09:26 +00:00
}
let mut rng = rand::thread_rng();
// generate the ops that the mutator thread will perform
let write_ops: Vec<MutatorOp> = (0..N_MUTATOR_OPS).map(|_| rng.gen()).collect();
2018-11-01 00:05:31 +00:00
// execute the "main thread", which sometimes snapshots off other threads
let mut all_threads = vec![];
for op in write_ops {
match op {
MutatorOp::WriteOp(w) => w.execute(&mut db),
MutatorOp::LaunchReader {
ops,
check_cancellation,
} => all_threads.push(std::thread::spawn({
2018-11-01 00:05:31 +00:00
let db = db.snapshot();
move || Cancelled::catch(|| db_reader_thread(&db, ops, check_cancellation))
})),
}
}
for thread in all_threads {
2021-05-17 16:59:28 +00:00
thread.join().unwrap().ok();
2018-10-30 12:09:26 +00:00
}
}