diff --git a/Cargo.lock b/Cargo.lock index 990eb2e0..1fbeacf2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -423,16 +423,6 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6548a0ad5d2549e111e1f6a11a6c2e2d00ce6a3dafe22948d67c2b443f775e52" -[[package]] -name = "crossbeam-channel" -version = "0.5.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521" -dependencies = [ - "cfg-if", - "crossbeam-utils", -] - [[package]] name = "crossbeam-deque" version = "0.8.2" @@ -677,7 +667,9 @@ dependencies = [ "itertools 0.12.1", "loro 0.16.2", "loro 0.16.2 (git+https://github.com/loro-dev/loro.git?rev=90470658435ec4c62b5af59ebb82fe9e1f5aa761)", + "num_cpus", "rand", + "rayon", "serde_json", "tabled 0.10.0", "tracing", @@ -796,15 +788,6 @@ dependencies = [ "libc", ] -[[package]] -name = "hermit-abi" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7" -dependencies = [ - "libc", -] - [[package]] name = "hermit-abi" version = "0.3.9" @@ -1421,11 +1404,11 @@ dependencies = [ [[package]] name = "num_cpus" -version = "1.15.0" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi 0.2.6", + "hermit-abi 0.3.9", "libc", ] @@ -1733,9 +1716,9 @@ dependencies = [ [[package]] name = "rayon" -version = "1.6.1" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6db3a213adf02b3bcfd2d3846bb41cb22857d131789e01df434fb7e7bc0759b7" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" dependencies = [ "either", "rayon-core", @@ -1743,14 +1726,12 @@ dependencies = [ [[package]] name = "rayon-core" -version = "1.10.1" +version = "1.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cac410af5d00ab6884528b4ab69d1e8e146e8d471201800fa1b4524126de6ad3" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" dependencies = [ - "crossbeam-channel", "crossbeam-deque", "crossbeam-utils", - "num_cpus", ] [[package]] diff --git a/crates/fuzz/Cargo.toml b/crates/fuzz/Cargo.toml index eb4fd1e7..7ceea91e 100644 --- a/crates/fuzz/Cargo.toml +++ b/crates/fuzz/Cargo.toml @@ -18,6 +18,8 @@ arbitrary = "1" tabled = "0.10" rand = "0.8.5" serde_json = "1" +num_cpus = "1.16.0" +rayon = "1.10.0" [dev-dependencies] ctor = "0.2" diff --git a/crates/fuzz/src/crdt_fuzzer.rs b/crates/fuzz/src/crdt_fuzzer.rs index be57c8ec..013a7aa6 100644 --- a/crates/fuzz/src/crdt_fuzzer.rs +++ b/crates/fuzz/src/crdt_fuzzer.rs @@ -1,11 +1,15 @@ use std::{ + collections::VecDeque, fmt::{Debug, Display}, + sync::{Arc, Mutex}, + thread, time::Instant, }; use arbitrary::Arbitrary; use fxhash::FxHashSet; use loro::{ContainerType, Frontiers}; +use rayon::iter::ParallelExtend; use tabled::TableIteratorExt; use tracing::{info, info_span}; @@ -334,9 +338,9 @@ pub fn test_multi_sites(site_num: u8, fuzz_targets: Vec, actions: &m pub fn minify_error(site_num: u8, f: F, normalize: N, actions: Vec) where - F: Fn(u8, &mut [T]), + F: Fn(u8, &mut [T]) + Send + Sync + 'static, N: Fn(u8, &mut [T]) -> Vec, - T: Clone + Debug, + T: Clone + Debug + Send + 'static, { std::panic::set_hook(Box::new(|_info| { // ignore panic output @@ -362,54 +366,81 @@ where return; } - let mut minified = actions.clone(); - let mut candidates = Vec::new(); + let minified = Arc::new(Mutex::new(actions.clone())); + let candidates = Arc::new(Mutex::new(VecDeque::new())); println!("Setup candidates..."); for i in 0..actions.len() { let mut new = actions.clone(); new.remove(i); - candidates.push(new); + candidates.lock().unwrap().push_back(new); } println!("Minifying..."); let start = Instant::now(); - while let Some(candidate) = candidates.pop() { - let f_ref: *const _ = &f; - let f_ref: usize = f_ref as usize; - let mut actions_clone = candidate.clone(); - let action_ref: usize = (&mut actions_clone) as *mut _ as usize; - #[allow(clippy::blocks_in_conditions)] - if std::panic::catch_unwind(|| { - // SAFETY: test - let f = unsafe { &*(f_ref as *const F) }; - // SAFETY: test - let actions_ref = unsafe { &mut *(action_ref as *mut Vec) }; - f(site_num, actions_ref); - }) - .is_err() - { - for i in 0..candidate.len() { - let mut new = candidate.clone(); - new.remove(i); - candidates.push(new); + // Get the number of logical cores available on the system + let num_cores = num_cpus::get() / 2; + let f = Arc::new(f); + println!("start with {} threads", num_cores); + let mut threads = Vec::new(); + for _i in 0..num_cores { + let candidates = candidates.clone(); + let minified = minified.clone(); + let f = f.clone(); + threads.push(thread::spawn(move || { + loop { + let candidate = { + let Some(candidate) = candidates.lock().unwrap().pop_back() else { + return; + }; + candidate + }; + + let f_ref: *const _ = &f; + let f_ref: usize = f_ref as usize; + let mut actions_clone = candidate.clone(); + let action_ref: usize = (&mut actions_clone) as *mut _ as usize; + #[allow(clippy::blocks_in_conditions)] + if std::panic::catch_unwind(|| { + // SAFETY: test + let f = unsafe { &*(f_ref as *const F) }; + // SAFETY: test + let actions_ref = unsafe { &mut *(action_ref as *mut Vec) }; + f(site_num, actions_ref); + }) + .is_err() + { + let mut candidates = candidates.lock().unwrap(); + let mut minified = minified.lock().unwrap(); + for i in 0..candidate.len() { + let mut new = candidate.clone(); + new.remove(i); + candidates.push_back(new); + } + if candidate.len() < minified.len() { + *minified = candidate; + println!("New min len={}", minified.len()); + } + + if candidates.len() > 60 { + candidates.drain(0..30); + } + } + + if start.elapsed().as_secs() > 10 && minified.lock().unwrap().len() <= 4 { + break; + } + if start.elapsed().as_secs() > 60 { + break; + } } - if candidate.len() < minified.len() { - minified = candidate; - println!("New min len={}", minified.len()); - } - if candidates.len() > 40 { - candidates.drain(0..30); - } - } - if start.elapsed().as_secs() > 10 && minified.len() <= 4 { - break; - } - if start.elapsed().as_secs() > 60 { - break; - } + })); } - let minified = normalize(site_num, &mut minified); + for thread in threads.into_iter() { + thread.join().unwrap(); + } + + let minified = normalize(site_num, &mut minified.lock().unwrap()); println!( "Old Length {}, New Length {}", actions.len(), @@ -417,7 +448,15 @@ where ); dbg!(&minified); if actions.len() > minified.len() { - minify_error(site_num, f, normalize, minified); + minify_error( + site_num, + match Arc::try_unwrap(f) { + Ok(f) => f, + Err(_) => panic!(), + }, + normalize, + minified, + ); } }