From 0d13c9562c3a150c1a2222845fb6851d6ebe5a75 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Mon, 25 Jul 2022 15:32:10 +0800 Subject: [PATCH 01/16] refactor: undo redo use counter span undo & redo should be a local only operation --- crates/loro-core/src/dag.rs | 0 crates/loro-core/src/lib.rs | 3 +- crates/loro-core/src/log_store.rs | 2 +- crates/loro-core/src/op.rs | 8 +- crates/loro-core/src/op/insert_content.rs | 1 + crates/loro-core/src/op/op_content.rs | 11 +- crates/loro-core/src/{id_span.rs => span.rs} | 106 +++++++++++++------ 7 files changed, 94 insertions(+), 37 deletions(-) create mode 100644 crates/loro-core/src/dag.rs rename crates/loro-core/src/{id_span.rs => span.rs} (60%) diff --git a/crates/loro-core/src/dag.rs b/crates/loro-core/src/dag.rs new file mode 100644 index 00000000..e69de29b diff --git a/crates/loro-core/src/lib.rs b/crates/loro-core/src/lib.rs index a8ff0038..b8c91195 100644 --- a/crates/loro-core/src/lib.rs +++ b/crates/loro-core/src/lib.rs @@ -6,16 +6,17 @@ pub mod change; pub mod configure; pub mod container; +pub mod dag; pub mod id; pub mod op; pub mod version; mod error; -mod id_span; mod log_store; mod loro; mod smstring; mod snapshot; +mod span; mod tests; mod value; diff --git a/crates/loro-core/src/log_store.rs b/crates/loro-core/src/log_store.rs index e6bf1039..453e9388 100644 --- a/crates/loro-core/src/log_store.rs +++ b/crates/loro-core/src/log_store.rs @@ -14,8 +14,8 @@ use crate::{ configure::Configure, container::{Container, ContainerID, ContainerManager}, id::{ClientID, Counter}, - id_span::IdSpan, op::OpProxy, + span::IdSpan, version::TotalOrderStamp, Lamport, Op, Timestamp, ID, }; diff --git a/crates/loro-core/src/op.rs b/crates/loro-core/src/op.rs index a5dac56c..7d6712f0 100644 --- a/crates/loro-core/src/op.rs +++ b/crates/loro-core/src/op.rs @@ -1,4 +1,8 @@ -use crate::{container::ContainerID, id::ID, id_span::IdSpan}; +use crate::{ + container::ContainerID, + id::ID, + span::{CounterSpan, IdSpan}, +}; use rle::{HasLength, Mergable, RleVec, Sliceable}; mod insert_content; mod op_content; @@ -47,7 +51,7 @@ impl Op { } #[inline] - pub fn new_delete_op(id: ID, container: ContainerID, target: RleVec) -> Self { + pub fn new_delete_op(id: ID, container: ContainerID, target: RleVec) -> Self { Op::new(id, OpContent::Undo { target }, container) } diff --git a/crates/loro-core/src/op/insert_content.rs b/crates/loro-core/src/op/insert_content.rs index e46151a6..7fb08a59 100644 --- a/crates/loro-core/src/op/insert_content.rs +++ b/crates/loro-core/src/op/insert_content.rs @@ -32,6 +32,7 @@ pub trait InsertContent: HasLength + std::fmt::Debug + Any + MergeableContent + SliceableContent + CloneContent { fn id(&self) -> ContentType; + // TODO: provide an encoding method } impl SliceableContent for T { diff --git a/crates/loro-core/src/op/op_content.rs b/crates/loro-core/src/op/op_content.rs index cdc3e66c..77372483 100644 --- a/crates/loro-core/src/op/op_content.rs +++ b/crates/loro-core/src/op/op_content.rs @@ -1,14 +1,19 @@ use rle::{HasLength, Mergable, RleVec, Sliceable}; -use crate::{container::ContainerID, id::ID, id_span::IdSpan, OpType}; +use crate::{ + container::ContainerID, + id::ID, + span::{CounterSpan, IdSpan}, + OpType, +}; use super::{InsertContent, MergeableContent}; #[derive(Debug)] pub enum OpContent { Normal { content: Box }, - Undo { target: RleVec }, - Redo { target: RleVec }, + Undo { target: RleVec }, + Redo { target: RleVec }, } impl OpContent { diff --git a/crates/loro-core/src/id_span.rs b/crates/loro-core/src/span.rs similarity index 60% rename from crates/loro-core/src/id_span.rs rename to crates/loro-core/src/span.rs index 23d2e954..966bf20d 100644 --- a/crates/loro-core/src/id_span.rs +++ b/crates/loro-core/src/span.rs @@ -1,16 +1,20 @@ -use crate::id::{ClientID, ID}; +use crate::id::{ClientID, Counter, ID}; use rle::{HasLength, Mergable, Slice, Sliceable}; #[derive(Clone, Copy, PartialEq, Eq, Debug)] -pub struct IdSpan { - pub client_id: ClientID, - pub from: usize, - pub to: usize, +pub struct CounterSpan { + pub from: Counter, + pub to: Counter, } -impl IdSpan { +impl CounterSpan { #[inline] - pub fn min(&self) -> usize { + pub fn new(from: Counter, to: Counter) -> Self { + CounterSpan { from, to } + } + + #[inline] + pub fn min(&self) -> Counter { if self.from < self.to { self.from } else { @@ -19,7 +23,7 @@ impl IdSpan { } #[inline] - pub fn max(&self) -> usize { + pub fn max(&self) -> Counter { if self.from > self.to { self.from } else { @@ -28,44 +32,90 @@ impl IdSpan { } } -impl HasLength for IdSpan { +impl HasLength for CounterSpan { + #[inline] fn len(&self) -> usize { if self.to > self.from { - self.to - self.from + (self.to - self.from) as usize } else { - self.from - self.to + (self.from - self.to) as usize } } } -impl Sliceable for IdSpan { +impl Sliceable for CounterSpan { fn slice(&self, from: usize, to: usize) -> Self { assert!(from <= to); let len = to - from; assert!(len <= self.len()); if self.from < self.to { - IdSpan { - client_id: self.client_id, - from: self.from + from, - to: self.from + to, + CounterSpan { + from: self.from + from as Counter, + to: self.from + to as Counter, } } else { - IdSpan { - client_id: self.client_id, - from: self.from - from, - to: self.from - to, + CounterSpan { + from: self.from - from as Counter, + to: self.from - to as Counter, } } } } +impl Mergable for CounterSpan { + #[inline] + fn is_mergable(&self, other: &Self, _: &()) -> bool { + self.to == other.from + } + + #[inline] + fn merge(&mut self, other: &Self, _: &()) { + self.to = other.to; + } +} + +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub struct IdSpan { + pub client_id: ClientID, + pub counter: CounterSpan, +} + +impl IdSpan { + #[inline] + pub fn min(&self) -> Counter { + self.counter.min() + } + + #[inline] + pub fn max(&self) -> Counter { + self.counter.max() + } +} + +impl HasLength for IdSpan { + #[inline] + fn len(&self) -> usize { + self.counter.len() + } +} + +impl Sliceable for IdSpan { + #[inline] + fn slice(&self, from: usize, to: usize) -> Self { + IdSpan { + client_id: self.client_id, + counter: self.counter.slice(from, to), + } + } +} + impl Mergable for IdSpan { fn is_mergable(&self, other: &Self, _: &()) -> bool { - self.client_id == other.client_id && self.to == other.from + self.client_id == other.client_id && self.counter.is_mergable(&other.counter, &()) } fn merge(&mut self, other: &Self, _: &()) { - self.to = other.to; + self.counter.merge(&other.counter, &()) } } @@ -82,8 +132,7 @@ mod test_id_span { $( id_spans.push(IdSpan { client_id: $client_id, - from: $from, - to: $to, + counter: CounterSpan::new($from, $to), }); )* id_spans @@ -96,22 +145,19 @@ mod test_id_span { let mut id_span_vec = RleVec::new(); id_span_vec.push(IdSpan { client_id: 0, - from: 0, - to: 2, + counter: CounterSpan::new(0, 2), }); assert_eq!(id_span_vec.merged_len(), 1); assert_eq!(id_span_vec.len(), 2); id_span_vec.push(IdSpan { client_id: 0, - from: 2, - to: 4, + counter: CounterSpan::new(2, 4), }); assert_eq!(id_span_vec.merged_len(), 1); assert_eq!(id_span_vec.len(), 4); id_span_vec.push(IdSpan { client_id: 2, - from: 2, - to: 4, + counter: CounterSpan::new(2, 4), }); assert_eq!(id_span_vec.merged_len(), 2); assert_eq!(id_span_vec.len(), 6); From 1ca2b4226e201e956218d69c140a1151d1cb106c Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Wed, 27 Jul 2022 02:24:25 +0800 Subject: [PATCH 02/16] feat: dag init --- crates/loro-core/justfile | 18 + .../src/container/map/map_container.rs | 18 + crates/loro-core/src/container/map/tests.rs | 2 +- crates/loro-core/src/dag.rs | 135 +++++++ crates/loro-core/src/dag/test.rs | 343 ++++++++++++++++++ crates/loro-core/src/macros.rs | 65 ++++ crates/loro-core/src/span.rs | 51 ++- crates/loro-core/src/value.rs | 70 +++- crates/rle/Cargo.toml | 1 + crates/rle/src/lib.rs | 6 +- crates/rle/src/rle_trait.rs | 44 +++ crates/rle/src/{rle.rs => rle_vec.rs} | 71 ++-- 12 files changed, 770 insertions(+), 54 deletions(-) create mode 100644 crates/loro-core/justfile create mode 100644 crates/loro-core/src/dag/test.rs create mode 100644 crates/rle/src/rle_trait.rs rename crates/rle/src/{rle.rs => rle_vec.rs} (92%) diff --git a/crates/loro-core/justfile b/crates/loro-core/justfile new file mode 100644 index 00000000..164a1fb4 --- /dev/null +++ b/crates/loro-core/justfile @@ -0,0 +1,18 @@ +build: + cargo build + +test: + cargo nextest run + +# test without proptest +test-fast: + RUSTFLAGS='--cfg no_proptest' cargo nextest run + +check-unsafe: + env RUSTFLAGS="-Funsafe-code --cap-lints=warn" cargo check + +deny: + cargo deny check + +crev: + cargo crev crate check diff --git a/crates/loro-core/src/container/map/map_container.rs b/crates/loro-core/src/container/map/map_container.rs index 08159f58..3baa62bb 100644 --- a/crates/loro-core/src/container/map/map_container.rs +++ b/crates/loro-core/src/container/map/map_container.rs @@ -70,6 +70,15 @@ impl MapContainer { }, }]); + if self.value.is_some() { + self.value + .as_mut() + .unwrap() + .as_map_mut() + .unwrap() + .insert(key.clone(), value.clone().into()); + } + self.state.insert( key, ValueSlot { @@ -120,6 +129,15 @@ impl Container for MapContainer { counter: op.id().counter, }, ); + + if self.value.is_some() { + self.value + .as_mut() + .unwrap() + .as_map_mut() + .unwrap() + .insert(v.key.clone(), v.value.clone().into()); + } } } _ => unreachable!(), diff --git a/crates/loro-core/src/container/map/tests.rs b/crates/loro-core/src/container/map/tests.rs index 10572950..d7f67222 100644 --- a/crates/loro-core/src/container/map/tests.rs +++ b/crates/loro-core/src/container/map/tests.rs @@ -48,7 +48,7 @@ mod map_proptest { map.insert(k.clone(), v.clone()); container.insert(k.clone().into(), v.clone()); let snapshot = container.get_value(); - for (key, value) in snapshot.to_map().unwrap().iter() { + for (key, value) in snapshot.as_map().unwrap().iter() { assert_eq!(map.get(&key.to_string()).map(|x|x.clone().into()), Some(value.clone())); } } diff --git a/crates/loro-core/src/dag.rs b/crates/loro-core/src/dag.rs index e69de29b..2e661aee 100644 --- a/crates/loro-core/src/dag.rs +++ b/crates/loro-core/src/dag.rs @@ -0,0 +1,135 @@ +use std::{ + collections::{BinaryHeap, HashMap, VecDeque}, + ops::Range, +}; + +use fxhash::FxHashMap; +mod test; + +use crate::{ + change::Lamport, + id::{ClientID, Counter, ID}, + span::{CounterSpan, IdSpan}, +}; + +pub trait DagNode { + fn dag_id_start(&self) -> ID; + fn lamport_start(&self) -> Lamport; + fn len(&self) -> usize; + fn deps(&self) -> &Vec; + + fn is_empty(&self) -> bool { + self.len() == 0 + } + + fn dag_id_span(&self) -> IdSpan { + let id = self.dag_id_start(); + IdSpan { + client_id: id.client_id, + counter: CounterSpan::new(id.counter, id.counter + self.len() as Counter), + } + } +} + +pub(crate) trait Dag { + type Node: DagNode; + + fn get(&self, id: ID) -> Option<&Self::Node>; + fn contains(&self, id: ID) -> bool; + fn frontier(&self) -> &[ID]; + fn roots(&self) -> Vec<&Self::Node>; + + fn get_common_ancestor(&self, a: ID, b: ID) -> Option { + if a.client_id == b.client_id { + if a.counter <= b.counter { + Some(a) + } else { + Some(b) + } + } else { + #[derive(Debug, PartialEq, Eq)] + struct OrdId { + id: ID, + lamport: Lamport, + } + + impl PartialOrd for OrdId { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.lamport.cmp(&other.lamport)) + } + } + + impl Ord for OrdId { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.lamport.cmp(&other.lamport) + } + } + + let mut a_map: HashMap, _> = FxHashMap::default(); + let mut b_map: HashMap, _> = FxHashMap::default(); + let mut a_heap: BinaryHeap = BinaryHeap::new(); + let mut b_heap: BinaryHeap = BinaryHeap::new(); + { + let a = self.get(a).unwrap(); + let b = self.get(b).unwrap(); + a_heap.push(OrdId { + id: a.dag_id_start(), + lamport: a.lamport_start() + a.len() as Lamport, + }); + b_heap.push(OrdId { + id: b.dag_id_start(), + lamport: b.lamport_start() + b.len() as Lamport, + }); + } + + while !a_heap.is_empty() || !b_heap.is_empty() { + let (a_heap, b_heap, a_map, b_map) = + if a_heap.peek().map(|x| x.lamport).unwrap_or(0) + < b_heap.peek().map(|x| x.lamport).unwrap_or(0) + { + // swap + (&mut b_heap, &mut a_heap, &mut b_map, &mut a_map) + } else { + (&mut a_heap, &mut b_heap, &mut a_map, &mut b_map) + }; + + while !a_heap.is_empty() + && a_heap.peek().map(|x| x.lamport).unwrap_or(0) + >= b_heap.peek().map(|x| x.lamport).unwrap_or(0) + { + let id = a_heap.pop().unwrap().id; + if let Some(range) = b_map.get(&id.client_id) { + if range.contains(&id.counter) { + return Some(id); + } + } + + let a = self.get(id).unwrap(); + for dep in a.deps() { + a_heap.push(OrdId { + id: *dep, + lamport: a.lamport_start() + a.len() as Lamport, + }); + } + if let Some(range) = a_map.get_mut(&id.client_id) { + range.start = a.dag_id_start().counter; + } else { + let span = a.dag_id_span(); + a_map.insert(id.client_id, span.counter.from..span.counter.to); + } + } + } + + None + } + } +} + +fn update_frontier(frontier: &mut Vec, new_node_id: ID, new_node_deps: &[ID]) { + frontier.retain(|x| { + !new_node_deps + .iter() + .any(|y| y.client_id == x.client_id && y.counter >= x.counter) + }); + frontier.push(new_node_id); +} diff --git a/crates/loro-core/src/dag/test.rs b/crates/loro-core/src/dag/test.rs new file mode 100644 index 00000000..695a5d1b --- /dev/null +++ b/crates/loro-core/src/dag/test.rs @@ -0,0 +1,343 @@ +#![cfg(test)] + +use proptest::proptest; + +use super::*; +use crate::{ + change::Lamport, + id::{ClientID, Counter, ID}, + span::{CounterSpan, IdSpan}, +}; +use std::collections::HashSet; +use std::iter::FromIterator; + +#[derive(Debug, PartialEq, Eq, Clone)] +struct TestNode { + id: ID, + lamport: Lamport, + len: usize, + deps: Vec, +} + +impl TestNode { + fn new(id: ID, lamport: Lamport, deps: Vec, len: usize) -> Self { + Self { + id, + lamport, + deps, + len, + } + } +} + +impl DagNode for TestNode { + fn dag_id_start(&self) -> ID { + self.id + } + fn lamport_start(&self) -> Lamport { + self.lamport + } + fn len(&self) -> usize { + self.len + } + fn deps(&self) -> &Vec { + &self.deps + } +} + +#[derive(Debug, PartialEq, Eq)] +struct TestDag { + nodes: FxHashMap>, + frontier: Vec, + version_vec: FxHashMap, + next_lamport: Lamport, + client_id: ClientID, +} + +impl Dag for TestDag { + type Node = TestNode; + + fn get(&self, id: ID) -> Option<&Self::Node> { + self.nodes.get(&id.client_id)?.iter().find(|node| { + id.counter >= node.id.counter && id.counter < node.id.counter + node.len as Counter + }) + } + fn frontier(&self) -> &[ID] { + &self.frontier + } + + fn roots(&self) -> Vec<&Self::Node> { + self.nodes.iter().map(|(_, v)| &v[0]).collect() + } + + fn contains(&self, id: ID) -> bool { + self.version_vec + .get(&id.client_id) + .and_then(|x| if *x > id.counter { Some(()) } else { None }) + .is_some() + } +} + +impl TestDag { + pub fn new(client_id: ClientID) -> Self { + Self { + nodes: FxHashMap::default(), + frontier: Vec::new(), + version_vec: FxHashMap::default(), + next_lamport: 0, + client_id, + } + } + + fn push(&mut self, len: usize) { + let client_id = self.client_id; + let counter = self.version_vec.entry(client_id).or_insert(0); + let id = ID::new(client_id, *counter); + *counter += len as u32; + let deps = std::mem::replace(&mut self.frontier, vec![id]); + self.nodes + .entry(client_id) + .or_insert(vec![]) + .push(TestNode::new(id, self.next_lamport, deps, len)); + self.next_lamport += len as u32; + } + + fn merge(&mut self, other: &TestDag) { + let mut pending = Vec::new(); + for (_, nodes) in other.nodes.iter() { + for (i, node) in nodes.iter().enumerate() { + if self._try_push_node(node, &mut pending, i) { + break; + } + } + } + + let mut current = pending; + let mut pending = Vec::new(); + while !pending.is_empty() || !current.is_empty() { + if current.is_empty() { + std::mem::swap(&mut pending, &mut current); + } + + let (client_id, index) = current.pop().unwrap(); + let node_vec = other.nodes.get(&client_id).unwrap(); + #[allow(clippy::needless_range_loop)] + for i in index..node_vec.len() { + let node = &node_vec[i]; + if self._try_push_node(node, &mut pending, i) { + break; + } + } + } + } + + fn _try_push_node( + &mut self, + node: &TestNode, + pending: &mut Vec<(u64, usize)>, + i: usize, + ) -> bool { + let client_id = node.id.client_id; + if self.contains(node.id) { + return false; + } + if node.deps.iter().any(|dep| !self.contains(*dep)) { + pending.push((client_id, i)); + return true; + } + update_frontier(&mut self.frontier, node.id, &node.deps); + self.nodes + .entry(client_id) + .or_insert(vec![]) + .push(node.clone()); + self.version_vec + .insert(client_id, node.id.counter + node.len as u32); + self.next_lamport = self.next_lamport.max(node.lamport + node.len as u32); + false + } +} + +#[test] +fn test_dag() { + let mut a = TestDag::new(0); + let mut b = TestDag::new(1); + a.push(1); + assert_eq!(a.frontier().len(), 1); + assert_eq!(a.frontier()[0].counter, 0); + b.push(1); + a.merge(&b); + assert_eq!(a.frontier().len(), 2); + a.push(1); + assert_eq!(a.frontier().len(), 1); + // a: 0 --(merge)--- 1 + // ↑ + // | + // b: 0 ---- + assert_eq!( + a.frontier()[0], + ID { + client_id: 0, + counter: 1 + } + ); + + // a: 0 --(merge)--- 1 --- 2 ------- + // ↑ | + // | ↓ + // b: 0 ------------1----------(merge) + a.push(1); + b.push(1); + b.merge(&a); + assert_eq!(b.next_lamport, 3); + assert_eq!(b.frontier().len(), 2); + assert_eq!( + b.get_common_ancestor(ID::new(0, 2), ID::new(1, 1)), + Some(ID::new(1, 0)) + ); +} + +#[cfg(not(no_proptest))] +mod find_common_ancestors { + use proptest::prelude::*; + + use crate::{array_mut_ref, unsafe_array_mut_ref}; + + use super::*; + + #[derive(Debug, Clone, Copy)] + struct Interaction { + dag_idx: usize, + merge_with: Option, + len: usize, + } + + prop_compose! { + fn gen_interaction(num: usize)(dag_idx in 0..num, merge_with in 0..num, length in 1..10, should_merge in 0..2) -> Interaction { + Interaction { + dag_idx, + merge_with: if should_merge == 1 && merge_with != dag_idx { Some(merge_with) } else { None }, + len: length as usize, + } + } + } + + proptest! { + #[test] + fn test_2dags( + before_merged_insertions in prop::collection::vec(gen_interaction(2), 0..300), + after_merged_insertions in prop::collection::vec(gen_interaction(2), 0..300) + ) { + test(2, before_merged_insertions, after_merged_insertions)?; + } + + #[test] + fn test_3dags( + before_merged_insertions in prop::collection::vec(gen_interaction(3), 0..300), + after_merged_insertions in prop::collection::vec(gen_interaction(3), 0..300) + ) { + test(3, before_merged_insertions, after_merged_insertions)?; + } + + #[test] + fn test_4dags( + before_merged_insertions in prop::collection::vec(gen_interaction(4), 0..300), + after_merged_insertions in prop::collection::vec(gen_interaction(4), 0..300) + ) { + test(4, before_merged_insertions, after_merged_insertions)?; + } + + #[test] + fn test_10dags( + before_merged_insertions in prop::collection::vec(gen_interaction(10), 0..300), + after_merged_insertions in prop::collection::vec(gen_interaction(10), 0..300) + ) { + test(10, before_merged_insertions, after_merged_insertions)?; + } + + #[test] + fn test_100dags( + before_merged_insertions in prop::collection::vec(gen_interaction(100), 0..2000), + after_merged_insertions in prop::collection::vec(gen_interaction(100), 0..2000) + ) { + test(100, before_merged_insertions, after_merged_insertions)?; + } + } + + fn preprocess(interactions: &mut [Interaction], num: i32) { + for interaction in interactions.iter_mut() { + interaction.dag_idx %= num as usize; + if let Some(ref mut merge_with) = interaction.merge_with { + *merge_with %= num as usize; + if *merge_with == interaction.dag_idx { + *merge_with = (*merge_with + 1) % num as usize; + } + } + } + } + + fn test( + dag_num: i32, + mut before_merge_insertion: Vec, + mut after_merge_insertion: Vec, + ) -> Result<(), TestCaseError> { + preprocess(&mut before_merge_insertion, dag_num); + preprocess(&mut after_merge_insertion, dag_num); + let mut dags = Vec::new(); + for i in 0..dag_num { + dags.push(TestDag::new(i as ClientID)); + } + + for interaction in before_merge_insertion { + apply(interaction, &mut dags); + } + + let (dag0,): (&mut TestDag,) = unsafe_array_mut_ref!(&mut dags, [0]); + for dag in &dags[1..] { + dag0.merge(dag); + } + + dag0.push(1); + let expected = dag0.frontier()[0]; + for dag in &mut dags[1..] { + dag.merge(dag0); + } + for interaction in after_merge_insertion.iter_mut() { + if let Some(merge) = interaction.merge_with { + // odd dag merges with the odd + // even dag merges with the even + if merge % 2 != interaction.dag_idx % 2 { + interaction.merge_with = None; + } + } + + apply(*interaction, &mut dags); + } + + let (dag0, dag1) = array_mut_ref!(&mut dags, [0, 1]); + dag1.push(1); + dag0.merge(dag1); + // dbg!(dag0, dag1, expected); + let actual = dags[0].get_common_ancestor( + dags[0].nodes.get(&0).unwrap().last().unwrap().id, + dags[1].nodes.get(&1).unwrap().last().unwrap().id, + ); + prop_assert_eq!(actual.unwrap(), expected); + Ok(()) + } + + fn apply(interaction: Interaction, dags: &mut [TestDag]) { + let Interaction { + dag_idx, + len, + merge_with, + } = interaction; + if let Some(merge_with) = merge_with { + let (dag, merge_target): (&mut TestDag, &mut TestDag) = + array_mut_ref!(dags, [dag_idx, merge_with]); + dag.push(len); + dag.merge(merge_target); + } else { + dags[dag_idx].push(len); + } + } +} diff --git a/crates/loro-core/src/macros.rs b/crates/loro-core/src/macros.rs index 21e3ad69..e7994ed9 100644 --- a/crates/loro-core/src/macros.rs +++ b/crates/loro-core/src/macros.rs @@ -20,3 +20,68 @@ macro_rules! fx_map { } }; } + +#[macro_export] +macro_rules! unsafe_array_mut_ref { + ($arr:expr, [$($idx:expr),*]) => { + { + unsafe { + ( + $( + { &mut *(&mut $arr[$idx] as *mut _) } + ),*, + ) + } + } + } +} + +#[macro_export] +macro_rules! array_mut_ref { + ($arr:expr, [$a0:expr, $a1:expr]) => {{ + #[inline] + fn borrow_mut_ref(arr: &mut [T], a0: usize, a1: usize) -> (&mut T, &mut T) { + debug_assert!(a0 != a1); + unsafe { + ( + &mut *(&mut arr[a0] as *mut _), + &mut *(&mut arr[a1] as *mut _), + ) + } + } + + borrow_mut_ref($arr, $a0, $a1) + }}; + ($arr:expr, [$a0:expr, $a1:expr, $a2:expr]) => {{ + #[inline] + fn borrow_mut_ref( + arr: &mut [T], + a0: usize, + a1: usize, + a2: usize, + ) -> (&mut T, &mut T, &mut T) { + debug_assert!(a0 != a1 && a1 != a2 && a0 != a2); + unsafe { + ( + &mut *(&mut arr[a0] as *mut _), + &mut *(&mut arr[a1] as *mut _), + &mut *(&mut arr[a2] as *mut _), + ) + } + } + + borrow_mut_ref($arr, $a0, $a1, $a2) + }}; +} + +#[test] +fn test_macro() { + let mut arr = vec![100, 101, 102, 103]; + let (a, b, c) = array_mut_ref!(&mut arr, [1, 2, 3]); + assert_eq!(*a, 101); + assert_eq!(*b, 102); + *a = 50; + *b = 51; + assert!(arr[1] == 50); + assert!(arr[2] == 51); +} diff --git a/crates/loro-core/src/span.rs b/crates/loro-core/src/span.rs index 966bf20d..9eeb2b7d 100644 --- a/crates/loro-core/src/span.rs +++ b/crates/loro-core/src/span.rs @@ -1,6 +1,8 @@ use crate::id::{ClientID, Counter, ID}; use rle::{HasLength, Mergable, Slice, Sliceable}; +/// [from, to) +/// this is different from [std::ops::Range] because `from` may be greater than `to` #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub struct CounterSpan { pub from: Counter, @@ -18,7 +20,7 @@ impl CounterSpan { if self.from < self.to { self.from } else { - self.to + self.to + 1 } } @@ -27,9 +29,27 @@ impl CounterSpan { if self.from > self.to { self.from } else { - self.to + self.to - 1 } } + + #[inline] + pub fn intersect(&self, other: &Self) -> Option { + let min = self.min().max(other.min()); + let max = self.max().min(other.max()); + if min <= max { + Some(CounterSpan::new(min, max)) + } else { + None + } + } + + #[inline] + pub fn does_intersect(&self, other: &Self) -> bool { + let min = self.min().max(other.min()); + let max = self.max().min(other.max()); + min <= max + } } impl HasLength for CounterSpan { @@ -90,6 +110,33 @@ impl IdSpan { pub fn max(&self) -> Counter { self.counter.max() } + + #[inline] + pub fn does_intersect(&self, other: &Self) -> bool { + self.client_id == other.client_id && self.counter.does_intersect(&other.counter) + } + + #[inline] + pub fn intersect(&self, other: &Self) -> Option { + if self.client_id != other.client_id { + None + } else { + Some(IdSpan { + client_id: self.client_id, + counter: self.counter.intersect(&other.counter)?, + }) + } + } + + #[inline] + pub fn start(&self) -> ID { + ID::new(self.client_id, self.counter.min()) + } + + #[inline] + pub fn end(&self) -> ID { + ID::new(self.client_id, self.counter.max()) + } } impl HasLength for IdSpan { diff --git a/crates/loro-core/src/value.rs b/crates/loro-core/src/value.rs index a1af55b9..a37145ea 100644 --- a/crates/loro-core/src/value.rs +++ b/crates/loro-core/src/value.rs @@ -96,7 +96,7 @@ impl LoroValue { } #[inline] - pub fn to_map(&self) -> Option<&FxHashMap> { + pub fn as_map(&self) -> Option<&FxHashMap> { match self { LoroValue::Map(m) => Some(m), _ => None, @@ -104,7 +104,7 @@ impl LoroValue { } #[inline] - pub fn to_list(&self) -> Option<&Vec> { + pub fn as_list(&self) -> Option<&Vec> { match self { LoroValue::List(l) => Some(l), _ => None, @@ -112,7 +112,7 @@ impl LoroValue { } #[inline] - pub fn to_string(&self) -> Option<&SmString> { + pub fn as_string(&self) -> Option<&SmString> { match self { LoroValue::String(s) => Some(s), _ => None, @@ -120,7 +120,7 @@ impl LoroValue { } #[inline] - pub fn to_integer(&self) -> Option { + pub fn as_integer(&self) -> Option { match self { LoroValue::Integer(i) => Some(*i), _ => None, @@ -128,7 +128,7 @@ impl LoroValue { } #[inline] - pub fn to_double(&self) -> Option { + pub fn as_double(&self) -> Option { match self { LoroValue::Double(d) => Some(*d), _ => None, @@ -136,7 +136,7 @@ impl LoroValue { } #[inline] - pub fn to_bool(&self) -> Option { + pub fn as_bool(&self) -> Option { match self { LoroValue::Bool(b) => Some(*b), _ => None, @@ -144,7 +144,63 @@ impl LoroValue { } #[inline] - pub fn to_container(&self) -> Option<&ContainerID> { + pub fn as_container(&self) -> Option<&ContainerID> { + match self { + LoroValue::Unresolved(c) => Some(c), + _ => None, + } + } + + #[inline] + pub fn as_map_mut(&mut self) -> Option<&mut FxHashMap> { + match self { + LoroValue::Map(m) => Some(m), + _ => None, + } + } + + #[inline] + pub fn as_list_mut(&mut self) -> Option<&mut Vec> { + match self { + LoroValue::List(l) => Some(l), + _ => None, + } + } + + #[inline] + pub fn as_string_mut(&mut self) -> Option<&mut SmString> { + match self { + LoroValue::String(s) => Some(s), + _ => None, + } + } + + #[inline] + pub fn as_integer_mut(&mut self) -> Option<&mut i32> { + match self { + LoroValue::Integer(i) => Some(i), + _ => None, + } + } + + #[inline] + pub fn as_double_mut(&mut self) -> Option<&mut f64> { + match self { + LoroValue::Double(d) => Some(d), + _ => None, + } + } + + #[inline] + pub fn as_bool_mut(&mut self) -> Option<&mut bool> { + match self { + LoroValue::Bool(b) => Some(b), + _ => None, + } + } + + #[inline] + pub fn as_container_mut(&mut self) -> Option<&mut ContainerID> { match self { LoroValue::Unresolved(c) => Some(c), _ => None, diff --git a/crates/rle/Cargo.toml b/crates/rle/Cargo.toml index 19ed8d08..07b334c6 100644 --- a/crates/rle/Cargo.toml +++ b/crates/rle/Cargo.toml @@ -6,3 +6,4 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +num = "0.4.0" diff --git a/crates/rle/src/lib.rs b/crates/rle/src/lib.rs index 5af76106..368ecab7 100644 --- a/crates/rle/src/lib.rs +++ b/crates/rle/src/lib.rs @@ -1,2 +1,4 @@ -mod rle; -pub use crate::rle::{HasLength, Mergable, RleVec, SearchResult, Slice, SliceIterator, Sliceable}; +mod rle_trait; +mod rle_vec; +pub use crate::rle_trait::{HasLength, Mergable, Rle, Slice, Sliceable}; +pub use crate::rle_vec::{RleVec, SearchResult, SliceIterator}; diff --git a/crates/rle/src/rle_trait.rs b/crates/rle/src/rle_trait.rs new file mode 100644 index 00000000..d5ab3e81 --- /dev/null +++ b/crates/rle/src/rle_trait.rs @@ -0,0 +1,44 @@ +pub trait Mergable { + fn is_mergable(&self, _other: &Self, _conf: &Cfg) -> bool + where + Self: Sized, + { + false + } + + fn merge(&mut self, _other: &Self, _conf: &Cfg) + where + Self: Sized, + { + unreachable!() + } +} + +pub trait Sliceable { + fn slice(&self, from: usize, to: usize) -> Self; +} + +#[derive(Debug, Clone, Copy)] +pub struct Slice<'a, T> { + pub value: &'a T, + pub start: usize, + pub end: usize, +} + +impl Slice<'_, T> { + pub fn into_inner(&self) -> T { + self.value.slice(self.start, self.end) + } +} + +pub trait HasLength { + fn is_empty(&self) -> bool { + self.len() == 0 + } + + fn len(&self) -> usize; +} + +pub trait Rle: HasLength + Sliceable + Mergable {} + +impl, Cfg> Rle for T {} diff --git a/crates/rle/src/rle.rs b/crates/rle/src/rle_vec.rs similarity index 92% rename from crates/rle/src/rle.rs rename to crates/rle/src/rle_vec.rs index a22717f2..f7e4e4a9 100644 --- a/crates/rle/src/rle.rs +++ b/crates/rle/src/rle_vec.rs @@ -1,4 +1,7 @@ -use std::ops::Range; +use num::{cast, Integer, NumCast}; +use std::ops::{Range, Sub}; + +use crate::{HasLength, Mergable, Rle, Slice, Sliceable}; /// RleVec is a vector that can be compressed using run-length encoding. /// @@ -20,40 +23,6 @@ pub struct RleVec { cfg: Cfg, } -pub trait Mergable { - fn is_mergable(&self, _other: &Self, _conf: &Cfg) -> bool - where - Self: Sized, - { - false - } - - fn merge(&mut self, _other: &Self, _conf: &Cfg) - where - Self: Sized, - { - unreachable!() - } -} - -pub trait Sliceable { - fn slice(&self, from: usize, to: usize) -> Self; -} - -impl Slice<'_, T> { - pub fn into_inner(&self) -> T { - self.value.slice(self.start, self.end) - } -} - -pub trait HasLength { - fn is_empty(&self) -> bool { - self.len() == 0 - } - - fn len(&self) -> usize; -} - pub struct SearchResult<'a, T> { pub element: &'a T, pub merged_index: usize, @@ -253,13 +222,6 @@ pub struct SliceIterator<'a, T> { end_offset: usize, } -#[derive(Debug, Clone, Copy)] -pub struct Slice<'a, T> { - pub value: &'a T, - pub start: usize, - pub end: usize, -} - impl<'a, T: HasLength> Iterator for SliceIterator<'a, T> { type Item = Slice<'a, T>; @@ -316,6 +278,31 @@ impl HasLength for RleVec { } } +impl Sliceable for Range { + fn slice(&self, start: usize, end: usize) -> Self { + self.start + cast(start).unwrap()..self.start + cast(end).unwrap() + } +} + +impl + Copy> Mergable for Range { + fn is_mergable(&self, other: &Self, _: &()) -> bool { + other.start <= self.end && other.start >= self.start + } + + fn merge(&mut self, other: &Self, _conf: &()) + where + Self: Sized, + { + self.end = other.end; + } +} + +impl HasLength for Range { + fn len(&self) -> usize { + cast(self.end - self.start).unwrap() + } +} + #[cfg(test)] mod test { mod prime_value { From 8db47780b9ee5349b8c6c9db67dcd944317f6592 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Fri, 29 Jul 2022 18:10:06 +0800 Subject: [PATCH 03/16] fix: dag issues --- crates/loro-core/src/dag.rs | 113 +++++++++++++++++---------- crates/loro-core/src/dag/test.rs | 130 +++++++++++++++++++++++-------- 2 files changed, 169 insertions(+), 74 deletions(-) diff --git a/crates/loro-core/src/dag.rs b/crates/loro-core/src/dag.rs index 2e661aee..c13abb63 100644 --- a/crates/loro-core/src/dag.rs +++ b/crates/loro-core/src/dag.rs @@ -18,10 +18,12 @@ pub trait DagNode { fn len(&self) -> usize; fn deps(&self) -> &Vec; + #[inline] fn is_empty(&self) -> bool { self.len() == 0 } + #[inline] fn dag_id_span(&self) -> IdSpan { let id = self.dag_id_start(); IdSpan { @@ -29,6 +31,16 @@ pub trait DagNode { counter: CounterSpan::new(id.counter, id.counter + self.len() as Counter), } } + + /// inclusive end + #[inline] + fn dag_id_end(&self) -> ID { + let id = self.dag_id_start(); + ID { + client_id: id.client_id, + counter: id.counter + self.len() as Counter - 1, + } + } } pub(crate) trait Dag { @@ -39,83 +51,98 @@ pub(crate) trait Dag { fn frontier(&self) -> &[ID]; fn roots(&self) -> Vec<&Self::Node>; - fn get_common_ancestor(&self, a: ID, b: ID) -> Option { - if a.client_id == b.client_id { - if a.counter <= b.counter { - Some(a) + // + // TODO: Maybe use Result return type + // TODO: benchmark + // TODO: visited + // how to test better? + // - converge through other nodes + // + /// only returns a single root. + /// but the least common ancestor may be more than one root. + /// But that is a rare case. + fn find_common_ancestor(&self, a_id: ID, b_id: ID) -> Option { + if a_id.client_id == b_id.client_id { + if a_id.counter <= b_id.counter { + Some(a_id) } else { - Some(b) + Some(b_id) } } else { #[derive(Debug, PartialEq, Eq)] - struct OrdId { + struct OrdId<'a> { id: ID, lamport: Lamport, + deps: &'a [ID], } - impl PartialOrd for OrdId { + impl<'a> PartialOrd for OrdId<'a> { fn partial_cmp(&self, other: &Self) -> Option { Some(self.lamport.cmp(&other.lamport)) } } - impl Ord for OrdId { + impl<'a> Ord for OrdId<'a> { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.lamport.cmp(&other.lamport) } } - let mut a_map: HashMap, _> = FxHashMap::default(); - let mut b_map: HashMap, _> = FxHashMap::default(); - let mut a_heap: BinaryHeap = BinaryHeap::new(); - let mut b_heap: BinaryHeap = BinaryHeap::new(); + let mut _a_vv: HashMap = FxHashMap::default(); + let mut _b_vv: HashMap = FxHashMap::default(); + let mut _a_heap: BinaryHeap = BinaryHeap::new(); + let mut _b_heap: BinaryHeap = BinaryHeap::new(); { - let a = self.get(a).unwrap(); - let b = self.get(b).unwrap(); - a_heap.push(OrdId { - id: a.dag_id_start(), - lamport: a.lamport_start() + a.len() as Lamport, + let a = self.get(a_id).unwrap(); + let b = self.get(b_id).unwrap(); + _a_heap.push(OrdId { + id: a_id, + lamport: a_id.counter + a.lamport_start() - a.dag_id_start().counter, + deps: a.deps(), }); - b_heap.push(OrdId { - id: b.dag_id_start(), - lamport: b.lamport_start() + b.len() as Lamport, + _b_heap.push(OrdId { + id: b_id, + lamport: b_id.counter + b.lamport_start() - b.dag_id_start().counter, + deps: b.deps(), }); + _a_vv.insert(a_id.client_id, a_id.counter + 1); + _b_vv.insert(b_id.client_id, b_id.counter + 1); } - while !a_heap.is_empty() || !b_heap.is_empty() { - let (a_heap, b_heap, a_map, b_map) = - if a_heap.peek().map(|x| x.lamport).unwrap_or(0) - < b_heap.peek().map(|x| x.lamport).unwrap_or(0) - { - // swap - (&mut b_heap, &mut a_heap, &mut b_map, &mut a_map) - } else { - (&mut a_heap, &mut b_heap, &mut a_map, &mut b_map) - }; + while !_a_heap.is_empty() || !_b_heap.is_empty() { + let (a_heap, b_heap, a_vv, b_vv) = if _a_heap.is_empty() + || (_a_heap.peek().map(|x| x.lamport).unwrap_or(0) + < _b_heap.peek().map(|x| x.lamport).unwrap_or(0)) + { + // swap + (&mut _b_heap, &mut _a_heap, &mut _b_vv, &mut _a_vv) + } else { + (&mut _a_heap, &mut _b_heap, &mut _a_vv, &mut _b_vv) + }; while !a_heap.is_empty() && a_heap.peek().map(|x| x.lamport).unwrap_or(0) >= b_heap.peek().map(|x| x.lamport).unwrap_or(0) { - let id = a_heap.pop().unwrap().id; - if let Some(range) = b_map.get(&id.client_id) { - if range.contains(&id.counter) { + let a = a_heap.pop().unwrap(); + let id = a.id; + if let Some(counter_end) = b_vv.get(&id.client_id) { + if id.counter < *counter_end { return Some(id); } } - let a = self.get(id).unwrap(); - for dep in a.deps() { + for dep_id in a.deps { + let dep = self.get(*dep_id).unwrap(); a_heap.push(OrdId { - id: *dep, - lamport: a.lamport_start() + a.len() as Lamport, + id: *dep_id, + lamport: dep_id.counter + dep.lamport_start() + - dep.dag_id_start().counter, + deps: dep.deps(), }); - } - if let Some(range) = a_map.get_mut(&id.client_id) { - range.start = a.dag_id_start().counter; - } else { - let span = a.dag_id_span(); - a_map.insert(id.client_id, span.counter.from..span.counter.to); + + a_vv.entry(dep_id.client_id) + .or_insert_with(|| dep_id.counter + 1); } } } diff --git a/crates/loro-core/src/dag/test.rs b/crates/loro-core/src/dag/test.rs index 695a5d1b..b033fc5c 100644 --- a/crates/loro-core/src/dag/test.rs +++ b/crates/loro-core/src/dag/test.rs @@ -145,7 +145,11 @@ impl TestDag { pending.push((client_id, i)); return true; } - update_frontier(&mut self.frontier, node.id, &node.deps); + update_frontier( + &mut self.frontier, + node.id.inc((node.len() - 1) as u32), + &node.deps, + ); self.nodes .entry(client_id) .or_insert(vec![]) @@ -191,28 +195,109 @@ fn test_dag() { assert_eq!(b.next_lamport, 3); assert_eq!(b.frontier().len(), 2); assert_eq!( - b.get_common_ancestor(ID::new(0, 2), ID::new(1, 1)), + b.find_common_ancestor(ID::new(0, 2), ID::new(1, 1)), Some(ID::new(1, 0)) ); } -#[cfg(not(no_proptest))] +#[derive(Debug, Clone, Copy)] +struct Interaction { + dag_idx: usize, + merge_with: Option, + len: usize, +} + mod find_common_ancestors { + use super::*; + + #[test] + fn no_common_ancestors() { + let mut a = TestDag::new(0); + let mut b = TestDag::new(1); + a.push(1); + b.push(1); + a.merge(&b); + let actual = a.find_common_ancestor(ID::new(0, 0), ID::new(1, 0)); + assert_eq!(actual, None); + + // interactions between b and c + let mut c = TestDag::new(2); + c.merge(&b); + c.push(2); + b.merge(&c); + b.push(3); + + // should no exist any common ancestor between a and b + let actual = a.find_common_ancestor(ID::new(0, 0), ID::new(1, 0)); + assert_eq!(actual, None); + } + + #[test] + fn dep_in_middle() { + let mut a = TestDag::new(0); + let mut b = TestDag::new(1); + a.push(4); + b.push(4); + b.push(5); + b.merge(&a); + b.frontier.retain(|x| x.client_id == 1); + let k = b.nodes.get_mut(&1).unwrap(); + k[1].deps.push(ID::new(0, 2)); + assert_eq!( + b.find_common_ancestor(ID::new(0, 3), ID::new(1, 8)), + Some(ID::new(0, 2)) + ); + } + + /// ![](https://i.ibb.co/C5xLG53/image.png) + #[test] + fn large_lamport_with_longer_path() { + let mut a0 = TestDag::new(0); + let mut a1 = TestDag::new(1); + let mut a2 = TestDag::new(2); + + a0.push(3); + a1.push(3); + a2.push(2); + a2.merge(&a0); + a2.push(1); + a1.merge(&a2); + a2.push(1); + a1.push(1); + a1.merge(&a2); + a1.push(1); + a1.nodes + .get_mut(&1) + .unwrap() + .last_mut() + .unwrap() + .deps + .push(ID::new(0, 1)); + a0.push(1); + a1.merge(&a2); + a1.merge(&a0); + assert_eq!( + a1.find_common_ancestor(ID::new(0, 3), ID::new(1, 4)), + Some(ID::new(0, 2)) + ); + } +} + +#[cfg(not(no_proptest))] +mod find_common_ancestors_proptest { use proptest::prelude::*; use crate::{array_mut_ref, unsafe_array_mut_ref}; use super::*; - #[derive(Debug, Clone, Copy)] - struct Interaction { - dag_idx: usize, - merge_with: Option, - len: usize, - } - prop_compose! { - fn gen_interaction(num: usize)(dag_idx in 0..num, merge_with in 0..num, length in 1..10, should_merge in 0..2) -> Interaction { + fn gen_interaction(num: usize) ( + dag_idx in 0..num, + merge_with in 0..num, + length in 1..10, + should_merge in 0..2 + ) -> Interaction { Interaction { dag_idx, merge_with: if should_merge == 1 && merge_with != dag_idx { Some(merge_with) } else { None }, @@ -238,14 +323,6 @@ mod find_common_ancestors { test(3, before_merged_insertions, after_merged_insertions)?; } - #[test] - fn test_4dags( - before_merged_insertions in prop::collection::vec(gen_interaction(4), 0..300), - after_merged_insertions in prop::collection::vec(gen_interaction(4), 0..300) - ) { - test(4, before_merged_insertions, after_merged_insertions)?; - } - #[test] fn test_10dags( before_merged_insertions in prop::collection::vec(gen_interaction(10), 0..300), @@ -253,14 +330,6 @@ mod find_common_ancestors { ) { test(10, before_merged_insertions, after_merged_insertions)?; } - - #[test] - fn test_100dags( - before_merged_insertions in prop::collection::vec(gen_interaction(100), 0..2000), - after_merged_insertions in prop::collection::vec(gen_interaction(100), 0..2000) - ) { - test(100, before_merged_insertions, after_merged_insertions)?; - } } fn preprocess(interactions: &mut [Interaction], num: i32) { @@ -317,10 +386,9 @@ mod find_common_ancestors { dag1.push(1); dag0.merge(dag1); // dbg!(dag0, dag1, expected); - let actual = dags[0].get_common_ancestor( - dags[0].nodes.get(&0).unwrap().last().unwrap().id, - dags[1].nodes.get(&1).unwrap().last().unwrap().id, - ); + let a = dags[0].nodes.get(&0).unwrap().last().unwrap().id; + let b = dags[1].nodes.get(&1).unwrap().last().unwrap().id; + let actual = dags[0].find_common_ancestor(a, b); prop_assert_eq!(actual.unwrap(), expected); Ok(()) } From fec3c272f8494043fbd46f8139beae7c57e47cc9 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Sat, 30 Jul 2022 01:51:55 +0800 Subject: [PATCH 04/16] fix: first met dep may have smaller counter --- crates/loro-core/src/dag.rs | 55 ++++++++++++++++++++++++++------ crates/loro-core/src/dag/test.rs | 8 ++--- 2 files changed, 49 insertions(+), 14 deletions(-) diff --git a/crates/loro-core/src/dag.rs b/crates/loro-core/src/dag.rs index c13abb63..69824118 100644 --- a/crates/loro-core/src/dag.rs +++ b/crates/loro-core/src/dag.rs @@ -41,6 +41,11 @@ pub trait DagNode { counter: id.counter + self.len() as Counter - 1, } } + + #[inline] + fn get_lamport_from_counter(&self, c: Counter) -> Lamport { + self.lamport_start() + c - self.dag_id_start().counter + } } pub(crate) trait Dag { @@ -97,12 +102,12 @@ pub(crate) trait Dag { let b = self.get(b_id).unwrap(); _a_heap.push(OrdId { id: a_id, - lamport: a_id.counter + a.lamport_start() - a.dag_id_start().counter, + lamport: a.get_lamport_from_counter(a_id.counter), deps: a.deps(), }); _b_heap.push(OrdId { id: b_id, - lamport: b_id.counter + b.lamport_start() - b.dag_id_start().counter, + lamport: b.get_lamport_from_counter(b_id.counter), deps: b.deps(), }); _a_vv.insert(a_id.client_id, a_id.counter + 1); @@ -110,14 +115,14 @@ pub(crate) trait Dag { } while !_a_heap.is_empty() || !_b_heap.is_empty() { - let (a_heap, b_heap, a_vv, b_vv) = if _a_heap.is_empty() + let (a_heap, b_heap, a_vv, b_vv, _swapped) = if _a_heap.is_empty() || (_a_heap.peek().map(|x| x.lamport).unwrap_or(0) < _b_heap.peek().map(|x| x.lamport).unwrap_or(0)) { // swap - (&mut _b_heap, &mut _a_heap, &mut _b_vv, &mut _a_vv) + (&mut _b_heap, &mut _a_heap, &mut _b_vv, &mut _a_vv, true) } else { - (&mut _a_heap, &mut _b_heap, &mut _a_vv, &mut _b_vv) + (&mut _a_heap, &mut _b_heap, &mut _a_vv, &mut _b_vv, false) }; while !a_heap.is_empty() @@ -132,17 +137,35 @@ pub(crate) trait Dag { } } + // if swapped { + // println!("A"); + // } else { + // println!("B"); + // } + // dbg!(&a); + + #[cfg(debug_assertions)] + { + if let Some(v) = a_vv.get(&a.id.client_id) { + assert!(*v > a.id.counter) + } + } + for dep_id in a.deps { let dep = self.get(*dep_id).unwrap(); a_heap.push(OrdId { id: *dep_id, - lamport: dep_id.counter + dep.lamport_start() - - dep.dag_id_start().counter, + lamport: dep.get_lamport_from_counter(dep_id.counter), deps: dep.deps(), }); - a_vv.entry(dep_id.client_id) - .or_insert_with(|| dep_id.counter + 1); + if let Some(v) = a_vv.get_mut(&dep_id.client_id) { + if *v < dep_id.counter + 1 { + *v = dep_id.counter + 1; + } + } else { + a_vv.insert(dep_id.client_id, dep_id.counter + 1); + } } } } @@ -154,9 +177,21 @@ pub(crate) trait Dag { fn update_frontier(frontier: &mut Vec, new_node_id: ID, new_node_deps: &[ID]) { frontier.retain(|x| { + if x.client_id == new_node_id.client_id && x.counter <= new_node_id.counter { + return false; + } + !new_node_deps .iter() .any(|y| y.client_id == x.client_id && y.counter >= x.counter) }); - frontier.push(new_node_id); + + // nodes from the same client with `counter < new_node_id.counter` + // are filtered out from frontier. + if frontier + .iter() + .all(|x| x.client_id != new_node_id.client_id) + { + frontier.push(new_node_id); + } } diff --git a/crates/loro-core/src/dag/test.rs b/crates/loro-core/src/dag/test.rs index b033fc5c..87d4a0ce 100644 --- a/crates/loro-core/src/dag/test.rs +++ b/crates/loro-core/src/dag/test.rs @@ -316,11 +316,11 @@ mod find_common_ancestors_proptest { } #[test] - fn test_3dags( - before_merged_insertions in prop::collection::vec(gen_interaction(3), 0..300), - after_merged_insertions in prop::collection::vec(gen_interaction(3), 0..300) + fn test_4dags( + before_merged_insertions in prop::collection::vec(gen_interaction(4), 0..300), + after_merged_insertions in prop::collection::vec(gen_interaction(4), 0..300) ) { - test(3, before_merged_insertions, after_merged_insertions)?; + test(4, before_merged_insertions, after_merged_insertions)?; } #[test] From c6f268edda9e0d0b1d2d362d9865d2328fe4ef2c Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Sun, 31 Jul 2022 15:56:15 +0800 Subject: [PATCH 05/16] chore: optimize build time --- crates/loro-core/Cargo.toml | 4 ++++ crates/loro-core/src/dag.rs | 1 + crates/loro-core/src/lib.rs | 1 + crates/loro-framework/Cargo.toml | 3 +++ rust-toolchain | 2 +- 5 files changed, 10 insertions(+), 1 deletion(-) diff --git a/crates/loro-core/Cargo.toml b/crates/loro-core/Cargo.toml index 1544cbf0..bc5831da 100644 --- a/crates/loro-core/Cargo.toml +++ b/crates/loro-core/Cargo.toml @@ -20,3 +20,7 @@ thiserror = "1.0.31" [dev-dependencies] proptest = "1.0.0" proptest-derive = "0.3.0" + +# See https://matklad.github.io/2021/02/27/delete-cargo-integration-tests.html +[lib] +doctest = false diff --git a/crates/loro-core/src/dag.rs b/crates/loro-core/src/dag.rs index 69824118..250bf2c2 100644 --- a/crates/loro-core/src/dag.rs +++ b/crates/loro-core/src/dag.rs @@ -4,6 +4,7 @@ use std::{ }; use fxhash::FxHashMap; +#[cfg(test)] mod test; use crate::{ diff --git a/crates/loro-core/src/lib.rs b/crates/loro-core/src/lib.rs index b8c91195..102ae2a7 100644 --- a/crates/loro-core/src/lib.rs +++ b/crates/loro-core/src/lib.rs @@ -17,6 +17,7 @@ mod loro; mod smstring; mod snapshot; mod span; +#[cfg(test)] mod tests; mod value; diff --git a/crates/loro-framework/Cargo.toml b/crates/loro-framework/Cargo.toml index 507ff84c..487889e3 100644 --- a/crates/loro-framework/Cargo.toml +++ b/crates/loro-framework/Cargo.toml @@ -11,3 +11,6 @@ loro-core = {path = "../loro-core"} ring = "0.16.20" rle = {path = "../rle"} sha2 = "0.10.2" + +[lib] +doctest = false diff --git a/rust-toolchain b/rust-toolchain index bf867e0a..2bf5ad04 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -nightly +stable From 2d2ca7620188b94b7ed1b4b6c735d21339da5d05 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Wed, 3 Aug 2022 18:27:13 +0800 Subject: [PATCH 06/16] perf: use visited --- crates/loro-core/src/dag.rs | 36 +++++++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/crates/loro-core/src/dag.rs b/crates/loro-core/src/dag.rs index 250bf2c2..af82aa0b 100644 --- a/crates/loro-core/src/dag.rs +++ b/crates/loro-core/src/dag.rs @@ -1,9 +1,9 @@ use std::{ - collections::{BinaryHeap, HashMap, VecDeque}, + collections::{BinaryHeap, HashMap, HashSet, VecDeque}, ops::Range, }; -use fxhash::FxHashMap; +use fxhash::{FxHashMap, FxHashSet}; #[cfg(test)] mod test; @@ -60,7 +60,6 @@ pub(crate) trait Dag { // // TODO: Maybe use Result return type // TODO: benchmark - // TODO: visited // how to test better? // - converge through other nodes // @@ -98,6 +97,8 @@ pub(crate) trait Dag { let mut _b_vv: HashMap = FxHashMap::default(); let mut _a_heap: BinaryHeap = BinaryHeap::new(); let mut _b_heap: BinaryHeap = BinaryHeap::new(); + let mut _a_visited: FxHashSet = FxHashSet::default(); + let mut _b_visited: FxHashSet = FxHashSet::default(); { let a = self.get(a_id).unwrap(); let b = self.get(b_id).unwrap(); @@ -116,14 +117,31 @@ pub(crate) trait Dag { } while !_a_heap.is_empty() || !_b_heap.is_empty() { - let (a_heap, b_heap, a_vv, b_vv, _swapped) = if _a_heap.is_empty() + let (a_heap, b_heap, a_vv, b_vv, a_visited, b_visited, _swapped) = if _a_heap + .is_empty() || (_a_heap.peek().map(|x| x.lamport).unwrap_or(0) < _b_heap.peek().map(|x| x.lamport).unwrap_or(0)) { // swap - (&mut _b_heap, &mut _a_heap, &mut _b_vv, &mut _a_vv, true) + ( + &mut _b_heap, + &mut _a_heap, + &mut _b_vv, + &mut _a_vv, + &mut _b_visited, + &mut _a_visited, + true, + ) } else { - (&mut _a_heap, &mut _b_heap, &mut _a_vv, &mut _b_vv, false) + ( + &mut _a_heap, + &mut _b_heap, + &mut _a_vv, + &mut _b_vv, + &mut _a_visited, + &mut _b_visited, + false, + ) }; while !a_heap.is_empty() @@ -153,13 +171,17 @@ pub(crate) trait Dag { } for dep_id in a.deps { + if a_visited.contains(dep_id) { + continue; + } + let dep = self.get(*dep_id).unwrap(); a_heap.push(OrdId { id: *dep_id, lamport: dep.get_lamport_from_counter(dep_id.counter), deps: dep.deps(), }); - + a_visited.insert(*dep_id); if let Some(v) = a_vv.get_mut(&dep_id.client_id) { if *v < dep_id.counter + 1 { *v = dep_id.counter + 1; From ed145367e07caf2e2e9828944c982bc01cc8486a Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Wed, 3 Aug 2022 21:10:20 +0800 Subject: [PATCH 07/16] feat: dag find path --- crates/loro-core/src/change.rs | 4 +- .../src/container/text/text_content.rs | 10 +- crates/loro-core/src/dag.rs | 348 ++++++++++++------ crates/loro-core/src/dag/test.rs | 72 +++- crates/loro-core/src/id.rs | 14 +- crates/loro-core/src/lib.rs | 2 +- crates/loro-core/src/log_store/iter.rs | 2 +- crates/loro-core/src/op.rs | 6 +- crates/loro-core/src/op/op_proxy.rs | 15 +- crates/loro-core/src/span.rs | 41 ++- 10 files changed, 364 insertions(+), 150 deletions(-) diff --git a/crates/loro-core/src/change.rs b/crates/loro-core/src/change.rs index 6d458363..d85d9765 100644 --- a/crates/loro-core/src/change.rs +++ b/crates/loro-core/src/change.rs @@ -97,7 +97,7 @@ impl Mergable for Change { } if other.deps.is_empty() - || (other.deps.len() == 1 && self.id.is_connected_id(&other.deps[0], self.len() as u32)) + || (other.deps.len() == 1 && self.id.is_connected_id(&other.deps[0], self.len())) { return false; } @@ -111,7 +111,7 @@ impl Mergable for Change { } self.id.client_id == other.id.client_id - && self.id.counter + self.len() as u32 == other.id.counter + && self.id.counter + self.len() as Counter == other.id.counter && self.lamport + self.len() as Lamport == other.lamport } } diff --git a/crates/loro-core/src/container/text/text_content.rs b/crates/loro-core/src/container/text/text_content.rs index bd28cbef..0c637ea3 100644 --- a/crates/loro-core/src/container/text/text_content.rs +++ b/crates/loro-core/src/container/text/text_content.rs @@ -1,4 +1,4 @@ -use crate::{ContentType, InsertContent, ID}; +use crate::{id::Counter, ContentType, InsertContent, ID}; use rle::{HasLength, Mergable, Sliceable}; #[derive(Debug, Clone)] @@ -12,9 +12,9 @@ pub struct TextContent { impl Mergable for TextContent { fn is_mergable(&self, other: &Self, _: &()) -> bool { other.id.client_id == self.id.client_id - && self.id.counter + self.len() as u32 == other.id.counter + && self.id.counter + self.len() as Counter == other.id.counter && self.id.client_id == other.origin_left.client_id - && self.id.counter + self.len() as u32 - 1 == other.origin_left.counter + && self.id.counter + self.len() as Counter - 1 == other.origin_left.counter && self.origin_right == other.origin_right } @@ -36,12 +36,12 @@ impl Sliceable for TextContent { TextContent { origin_left: ID { client_id: self.id.client_id, - counter: self.id.counter + from as u32 - 1, + counter: self.id.counter + from as Counter - 1, }, origin_right: self.origin_right, id: ID { client_id: self.id.client_id, - counter: self.id.counter + from as u32, + counter: self.id.counter + from as Counter, }, text: self.text[from..to].to_owned(), } diff --git a/crates/loro-core/src/dag.rs b/crates/loro-core/src/dag.rs index af82aa0b..af096b6d 100644 --- a/crates/loro-core/src/dag.rs +++ b/crates/loro-core/src/dag.rs @@ -45,7 +45,21 @@ pub trait DagNode { #[inline] fn get_lamport_from_counter(&self, c: Counter) -> Lamport { - self.lamport_start() + c - self.dag_id_start().counter + self.lamport_start() + c as Lamport - self.dag_id_start().counter as Lamport + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct Path { + retreat: Vec, + forward: Vec, +} + +#[allow(clippy::ptr_arg)] +fn reverse_path(path: &mut Vec) { + path.reverse(); + for span in path.iter_mut() { + span.counter.reverse(); } } @@ -59,6 +73,7 @@ pub(crate) trait Dag { // // TODO: Maybe use Result return type + // TODO: Maybe we only need one heap? // TODO: benchmark // how to test better? // - converge through other nodes @@ -66,135 +81,226 @@ pub(crate) trait Dag { /// only returns a single root. /// but the least common ancestor may be more than one root. /// But that is a rare case. + /// + #[inline] fn find_common_ancestor(&self, a_id: ID, b_id: ID) -> Option { - if a_id.client_id == b_id.client_id { - if a_id.counter <= b_id.counter { - Some(a_id) - } else { - Some(b_id) - } - } else { - #[derive(Debug, PartialEq, Eq)] - struct OrdId<'a> { - id: ID, - lamport: Lamport, - deps: &'a [ID], - } + find_common_ancestor( + &|id| self.get(id).map(|x| x as &dyn DagNode), + a_id, + b_id, + |_, _, _| {}, + ) + } - impl<'a> PartialOrd for OrdId<'a> { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.lamport.cmp(&other.lamport)) - } - } + #[inline] + fn find_path(&self, from: ID, to: ID) -> Option { + let mut ans: Option = None; - impl<'a> Ord for OrdId<'a> { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.lamport.cmp(&other.lamport) - } - } - - let mut _a_vv: HashMap = FxHashMap::default(); - let mut _b_vv: HashMap = FxHashMap::default(); - let mut _a_heap: BinaryHeap = BinaryHeap::new(); - let mut _b_heap: BinaryHeap = BinaryHeap::new(); - let mut _a_visited: FxHashSet = FxHashSet::default(); - let mut _b_visited: FxHashSet = FxHashSet::default(); - { - let a = self.get(a_id).unwrap(); - let b = self.get(b_id).unwrap(); - _a_heap.push(OrdId { - id: a_id, - lamport: a.get_lamport_from_counter(a_id.counter), - deps: a.deps(), - }); - _b_heap.push(OrdId { - id: b_id, - lamport: b.get_lamport_from_counter(b_id.counter), - deps: b.deps(), - }); - _a_vv.insert(a_id.client_id, a_id.counter + 1); - _b_vv.insert(b_id.client_id, b_id.counter + 1); - } - - while !_a_heap.is_empty() || !_b_heap.is_empty() { - let (a_heap, b_heap, a_vv, b_vv, a_visited, b_visited, _swapped) = if _a_heap - .is_empty() - || (_a_heap.peek().map(|x| x.lamport).unwrap_or(0) - < _b_heap.peek().map(|x| x.lamport).unwrap_or(0)) - { - // swap - ( - &mut _b_heap, - &mut _a_heap, - &mut _b_vv, - &mut _a_vv, - &mut _b_visited, - &mut _a_visited, - true, - ) - } else { - ( - &mut _a_heap, - &mut _b_heap, - &mut _a_vv, - &mut _b_vv, - &mut _a_visited, - &mut _b_visited, - false, - ) - }; - - while !a_heap.is_empty() - && a_heap.peek().map(|x| x.lamport).unwrap_or(0) - >= b_heap.peek().map(|x| x.lamport).unwrap_or(0) - { - let a = a_heap.pop().unwrap(); - let id = a.id; - if let Some(counter_end) = b_vv.get(&id.client_id) { - if id.counter < *counter_end { - return Some(id); - } - } - - // if swapped { - // println!("A"); - // } else { - // println!("B"); - // } - // dbg!(&a); - - #[cfg(debug_assertions)] - { - if let Some(v) = a_vv.get(&a.id.client_id) { - assert!(*v > a.id.counter) - } - } - - for dep_id in a.deps { - if a_visited.contains(dep_id) { - continue; - } - - let dep = self.get(*dep_id).unwrap(); - a_heap.push(OrdId { - id: *dep_id, - lamport: dep.get_lamport_from_counter(dep_id.counter), - deps: dep.deps(), + #[inline] + fn get_rev_path(target: ID, from: ID, to_from_map: &FxHashMap) -> Vec { + let mut last_visited: Option = None; + let mut a_rev_path = vec![]; + let mut node_id = target; + node_id = *to_from_map.get(&node_id).unwrap(); + loop { + if let Some(last_id) = last_visited { + if last_id.client_id == node_id.client_id { + debug_assert!(last_id.counter < node_id.counter); + a_rev_path.push(IdSpan { + client_id: last_id.client_id, + counter: CounterSpan::new(last_id.counter, node_id.counter + 1), }); - a_visited.insert(*dep_id); - if let Some(v) = a_vv.get_mut(&dep_id.client_id) { - if *v < dep_id.counter + 1 { - *v = dep_id.counter + 1; - } - } else { - a_vv.insert(dep_id.client_id, dep_id.counter + 1); + last_visited = None; + } else { + a_rev_path.push(IdSpan { + client_id: last_id.client_id, + counter: CounterSpan::new(last_id.counter, last_id.counter + 1), + }); + last_visited = Some(node_id); + } + } else { + last_visited = Some(node_id); + } + + if node_id == from { + break; + } + + node_id = *to_from_map.get(&node_id).unwrap(); + } + + if let Some(last_id) = last_visited { + a_rev_path.push(IdSpan { + client_id: last_id.client_id, + counter: CounterSpan::new(last_id.counter, last_id.counter + 1), + }); + } + + a_rev_path + } + + find_common_ancestor( + &|id| self.get(id).map(|x| x as &dyn DagNode), + from, + to, + |ancestor, a_path, b_path| { + let mut a_path = get_rev_path(ancestor, from, a_path); + let b_path = get_rev_path(ancestor, to, b_path); + reverse_path(&mut a_path); + ans = Some(Path { + retreat: a_path, + forward: b_path, + }); + }, + ); + + ans + } +} + +fn find_common_ancestor<'a, F, G>(get: &'a F, a_id: ID, b_id: ID, mut on_found: G) -> Option +where + F: Fn(ID) -> Option<&'a dyn DagNode>, + G: FnMut(ID, &FxHashMap, &FxHashMap), +{ + if a_id.client_id == b_id.client_id { + if a_id.counter <= b_id.counter { + Some(a_id) + } else { + Some(b_id) + } + } else { + #[derive(Debug, PartialEq, Eq)] + struct OrdId<'a> { + id: ID, + lamport: Lamport, + deps: &'a [ID], + } + + impl<'a> PartialOrd for OrdId<'a> { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.lamport.cmp(&other.lamport)) + } + } + + impl<'a> Ord for OrdId<'a> { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.lamport.cmp(&other.lamport) + } + } + + let mut _a_vv: HashMap = FxHashMap::default(); + let mut _b_vv: HashMap = FxHashMap::default(); + // Invariant: every op id inserted to the a_heap is a key in a_path map, except for a_id + let mut _a_heap: BinaryHeap = BinaryHeap::new(); + // Likewise + let mut _b_heap: BinaryHeap = BinaryHeap::new(); + // FxHashMap is used to track the deps path of each node + let mut _a_path: FxHashMap = FxHashMap::default(); + let mut _b_path: FxHashMap = FxHashMap::default(); + { + let a = get(a_id).unwrap(); + let b = get(b_id).unwrap(); + _a_heap.push(OrdId { + id: a_id, + lamport: a.get_lamport_from_counter(a_id.counter), + deps: a.deps(), + }); + _b_heap.push(OrdId { + id: b_id, + lamport: b.get_lamport_from_counter(b_id.counter), + deps: b.deps(), + }); + _a_vv.insert(a_id.client_id, a_id.counter + 1); + _b_vv.insert(b_id.client_id, b_id.counter + 1); + } + + while !_a_heap.is_empty() || !_b_heap.is_empty() { + let (a_heap, b_heap, a_vv, b_vv, a_path, b_path, _swapped) = if _a_heap.is_empty() + || (_a_heap.peek().map(|x| x.lamport).unwrap_or(0) + < _b_heap.peek().map(|x| x.lamport).unwrap_or(0)) + { + // swap + ( + &mut _b_heap, + &mut _a_heap, + &mut _b_vv, + &mut _a_vv, + &mut _b_path, + &mut _a_path, + true, + ) + } else { + ( + &mut _a_heap, + &mut _b_heap, + &mut _a_vv, + &mut _b_vv, + &mut _a_path, + &mut _b_path, + false, + ) + }; + + while !a_heap.is_empty() + && a_heap.peek().map(|x| x.lamport).unwrap_or(0) + >= b_heap.peek().map(|x| x.lamport).unwrap_or(0) + { + let a = a_heap.pop().unwrap(); + let id = a.id; + if let Some(counter_end) = b_vv.get(&id.client_id) { + if id.counter < *counter_end { + b_path + .entry(id) + .or_insert_with(|| ID::new(id.client_id, counter_end - 1)); + + on_found(id, &_a_path, &_b_path); + return Some(id); + } + } + + // if swapped { + // println!("A"); + // } else { + // println!("B"); + // } + // dbg!(&a); + + #[cfg(debug_assertions)] + { + if let Some(v) = a_vv.get(&a.id.client_id) { + assert!(*v > a.id.counter) + } + } + + for dep_id in a.deps { + if a_path.contains_key(dep_id) { + continue; + } + + let dep = get(*dep_id).unwrap(); + a_heap.push(OrdId { + id: *dep_id, + lamport: dep.get_lamport_from_counter(dep_id.counter), + deps: dep.deps(), + }); + a_path.insert(*dep_id, a.id); + if dep.dag_id_start() != *dep_id { + a_path.insert(dep.dag_id_start(), *dep_id); + } + + if let Some(v) = a_vv.get_mut(&dep_id.client_id) { + if *v < dep_id.counter + 1 { + *v = dep_id.counter + 1; } + } else { + a_vv.insert(dep_id.client_id, dep_id.counter + 1); } } } - - None } + + None } } diff --git a/crates/loro-core/src/dag/test.rs b/crates/loro-core/src/dag/test.rs index 87d4a0ce..1bf5d44b 100644 --- a/crates/loro-core/src/dag/test.rs +++ b/crates/loro-core/src/dag/test.rs @@ -62,6 +62,7 @@ impl Dag for TestDag { id.counter >= node.id.counter && id.counter < node.id.counter + node.len as Counter }) } + fn frontier(&self) -> &[ID] { &self.frontier } @@ -89,11 +90,19 @@ impl TestDag { } } + fn get_last_node(&mut self) -> &mut TestNode { + self.nodes + .get_mut(&self.client_id) + .unwrap() + .last_mut() + .unwrap() + } + fn push(&mut self, len: usize) { let client_id = self.client_id; let counter = self.version_vec.entry(client_id).or_insert(0); let id = ID::new(client_id, *counter); - *counter += len as u32; + *counter += len as Counter; let deps = std::mem::replace(&mut self.frontier, vec![id]); self.nodes .entry(client_id) @@ -147,7 +156,7 @@ impl TestDag { } update_frontier( &mut self.frontier, - node.id.inc((node.len() - 1) as u32), + node.id.inc((node.len() - 1) as Counter), &node.deps, ); self.nodes @@ -155,7 +164,7 @@ impl TestDag { .or_insert(vec![]) .push(node.clone()); self.version_vec - .insert(client_id, node.id.counter + node.len as u32); + .insert(client_id, node.id.counter + node.len as Counter); self.next_lamport = self.next_lamport.max(node.lamport + node.len as u32); false } @@ -207,6 +216,63 @@ struct Interaction { len: usize, } +mod find_path { + use super::*; + + #[test] + fn no_path() { + let mut a = TestDag::new(0); + let mut b = TestDag::new(1); + a.push(1); + b.push(1); + a.merge(&b); + let actual = a.find_path(ID::new(0, 0), ID::new(1, 0)); + assert_eq!(actual, None); + } + + #[test] + fn one_path() { + let mut a = TestDag::new(0); + let mut b = TestDag::new(1); + // 0 - 0 + a.push(1); + b.merge(&a); + // 1 - 0 + b.push(1); + // 0 - 1 + a.push(1); + a.merge(&b); + let actual = a.find_path(ID::new(0, 1), ID::new(1, 0)); + assert_eq!( + actual, + Some(Path { + retreat: vec![IdSpan::new(0, CounterSpan::new(1, 0))], + forward: vec![IdSpan::new(1, CounterSpan::new(0, 1))], + }) + ); + } + + #[test] + fn middle() { + let mut a = TestDag::new(0); + let mut b = TestDag::new(1); + a.push(4); + b.push(1); + b.push(1); + let node = b.get_last_node(); + node.deps.push(ID::new(0, 2)); + b.merge(&a); + let actual = b.find_path(ID::new(0, 3), ID::new(1, 1)); + assert_eq!( + actual, + Some(Path { + retreat: vec![IdSpan::new(0, CounterSpan::new(3, 2))], + forward: vec![IdSpan::new(1, CounterSpan::new(1, 2))], + }) + ); + } +} + mod find_common_ancestors { use super::*; diff --git a/crates/loro-core/src/id.rs b/crates/loro-core/src/id.rs index f2d446e7..ec8e9188 100644 --- a/crates/loro-core/src/id.rs +++ b/crates/loro-core/src/id.rs @@ -1,21 +1,21 @@ use serde::Serialize; pub type ClientID = u64; -pub type Counter = u32; +pub type Counter = i32; #[derive(PartialEq, Eq, Hash, Clone, Debug, Copy, PartialOrd, Ord, Serialize)] pub struct ID { pub client_id: u64, - pub counter: u32, + pub counter: Counter, } pub const ROOT_ID: ID = ID { client_id: u64::MAX, - counter: u32::MAX, + counter: i32::MAX, }; impl ID { - pub fn new(client_id: u64, counter: u32) -> Self { + pub fn new(client_id: u64, counter: Counter) -> Self { ID { client_id, counter } } @@ -28,11 +28,11 @@ impl ID { } #[inline] - pub(crate) fn is_connected_id(&self, other: &Self, self_len: u32) -> bool { - self.client_id == other.client_id && self.counter + self_len == other.counter + pub(crate) fn is_connected_id(&self, other: &Self, self_len: usize) -> bool { + self.client_id == other.client_id && self.counter + self_len as Counter == other.counter } - pub fn inc(&self, inc: u32) -> Self { + pub fn inc(&self, inc: i32) -> Self { ID { client_id: self.client_id, counter: self.counter + inc, diff --git a/crates/loro-core/src/lib.rs b/crates/loro-core/src/lib.rs index 102ae2a7..6d24dede 100644 --- a/crates/loro-core/src/lib.rs +++ b/crates/loro-core/src/lib.rs @@ -1,7 +1,7 @@ //! # Loro //! //! -#![allow(dead_code, unused_imports, clippy::explicit_auto_deref)] +#![allow(dead_code, unused_imports)] pub mod change; pub mod configure; diff --git a/crates/loro-core/src/log_store/iter.rs b/crates/loro-core/src/log_store/iter.rs index 557cf15a..351a9fd5 100644 --- a/crates/loro-core/src/log_store/iter.rs +++ b/crates/loro-core/src/log_store/iter.rs @@ -61,7 +61,7 @@ impl<'a> Iterator for OpIter<'a> { self.heap.push(Reverse(OpProxy::new( change, op, - Some(start..(op.len() as u32)), + Some(start..(op.len() as Counter)), ))); } } diff --git a/crates/loro-core/src/op.rs b/crates/loro-core/src/op.rs index 7d6712f0..1bca4ae9 100644 --- a/crates/loro-core/src/op.rs +++ b/crates/loro-core/src/op.rs @@ -1,6 +1,6 @@ use crate::{ container::ContainerID, - id::ID, + id::{Counter, ID}, span::{CounterSpan, IdSpan}, }; use rle::{HasLength, Mergable, RleVec, Sliceable}; @@ -78,7 +78,7 @@ impl Op { impl Mergable for Op { fn is_mergable(&self, other: &Self, cfg: &()) -> bool { - self.id.is_connected_id(&other.id, self.len() as u32) + self.id.is_connected_id(&other.id, self.len()) && self.content.is_mergable(&other.content, cfg) && self.container == other.container } @@ -124,7 +124,7 @@ impl Sliceable for Op { Op { id: ID { client_id: self.id.client_id, - counter: (self.id.counter + from as u32), + counter: (self.id.counter + from as Counter), }, content, container: self.container.clone(), diff --git a/crates/loro-core/src/op/op_proxy.rs b/crates/loro-core/src/op/op_proxy.rs index 9c999dc5..3be68e02 100644 --- a/crates/loro-core/src/op/op_proxy.rs +++ b/crates/loro-core/src/op/op_proxy.rs @@ -2,14 +2,16 @@ use std::ops::Range; use rle::{HasLength, Sliceable}; -use crate::{container::ContainerID, Change, Lamport, Op, OpContent, OpType, Timestamp, ID}; +use crate::{ + container::ContainerID, id::Counter, Change, Lamport, Op, OpContent, OpType, Timestamp, ID, +}; /// OpProxy represents a slice of an Op pub struct OpProxy<'a> { change: &'a Change, op: &'a Op, /// slice range of the op, op[slice_range] - slice_range: Range, + slice_range: Range, } impl PartialEq for OpProxy<'_> { @@ -43,20 +45,21 @@ impl Ord for OpProxy<'_> { } impl<'a> OpProxy<'a> { - pub fn new(change: &'a Change, op: &'a Op, range: Option>) -> Self { + pub fn new(change: &'a Change, op: &'a Op, range: Option>) -> Self { OpProxy { change, op, slice_range: if let Some(range) = range { range } else { - 0..op.len() as u32 + 0..op.len() as Counter }, } } pub fn lamport(&self) -> Lamport { - self.change.lamport + self.op.id.counter - self.change.id.counter + self.slice_range.start + self.change.lamport + self.op.id.counter as Lamport - self.change.id.counter as Lamport + + self.slice_range.start as Lamport } pub fn id(&self) -> ID { @@ -74,7 +77,7 @@ impl<'a> OpProxy<'a> { self.op } - pub fn slice_range(&self) -> &Range { + pub fn slice_range(&self) -> &Range { &self.slice_range } diff --git a/crates/loro-core/src/span.rs b/crates/loro-core/src/span.rs index 9eeb2b7d..e926f699 100644 --- a/crates/loro-core/src/span.rs +++ b/crates/loro-core/src/span.rs @@ -15,6 +15,28 @@ impl CounterSpan { CounterSpan { from, to } } + #[inline] + pub fn from_inclusive(from: Counter, to: Counter) -> Self { + if from <= to { + CounterSpan { from, to: to + 1 } + } else { + CounterSpan { from, to: to - 1 } + } + } + + #[inline] + pub fn reverse(&mut self) { + if self.from == self.to { + return; + } + + if self.from < self.to { + (self.from, self.to) = (self.to - 1, self.from - 1); + } else { + (self.from, self.to) = (self.to + 1, self.from + 1); + } + } + #[inline] pub fn min(&self) -> Counter { if self.from < self.to { @@ -94,13 +116,30 @@ impl Mergable for CounterSpan { } } -#[derive(Clone, Copy, PartialEq, Eq, Debug)] +#[derive(Clone, Copy, PartialEq, Eq)] pub struct IdSpan { pub client_id: ClientID, pub counter: CounterSpan, } +impl std::fmt::Debug for IdSpan { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str( + format!( + "IdSpan{{{:?}, {} ~ {}}}", + self.client_id, self.counter.from, self.counter.to + ) + .as_str(), + ) + } +} + impl IdSpan { + #[inline] + pub fn new(client_id: ClientID, counter: CounterSpan) -> Self { + IdSpan { client_id, counter } + } + #[inline] pub fn min(&self) -> Counter { self.counter.min() From 16395a4fa291f103caea10d91489999272dcd49c Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Wed, 3 Aug 2022 21:56:06 +0800 Subject: [PATCH 08/16] feat: get vv from dag --- crates/loro-core/src/dag.rs | 45 +++++++++++++++++- crates/loro-core/src/dag/test.rs | 29 ++++++++++++ crates/loro-core/src/version.rs | 81 +++++++++++++++++++++++++++++++- 3 files changed, 152 insertions(+), 3 deletions(-) diff --git a/crates/loro-core/src/dag.rs b/crates/loro-core/src/dag.rs index af096b6d..614a88d6 100644 --- a/crates/loro-core/src/dag.rs +++ b/crates/loro-core/src/dag.rs @@ -11,6 +11,7 @@ use crate::{ change::Lamport, id::{ClientID, Counter, ID}, span::{CounterSpan, IdSpan}, + version::VersionVector, }; pub trait DagNode { @@ -92,11 +93,16 @@ pub(crate) trait Dag { ) } + /// TODO: we probably need cache to speedup this + #[inline] + fn get_vv(&self, id: ID) -> VersionVector { + get_version_vector(&|id| self.get(id).map(|x| x as &dyn DagNode), id) + } + #[inline] fn find_path(&self, from: ID, to: ID) -> Option { let mut ans: Option = None; - #[inline] fn get_rev_path(target: ID, from: ID, to_from_map: &FxHashMap) -> Vec { let mut last_visited: Option = None; let mut a_rev_path = vec![]; @@ -158,6 +164,43 @@ pub(crate) trait Dag { } } +fn get_version_vector<'a, Get>(get: &'a Get, id: ID) -> VersionVector +where + Get: Fn(ID) -> Option<&'a dyn DagNode>, +{ + let mut vv = VersionVector::new(); + let mut visited: FxHashSet = FxHashSet::default(); + vv.insert(id.client_id, id.counter + 1); + let node = get(id).unwrap(); + + if node.deps().is_empty() { + return vv; + } + + let mut stack = Vec::new(); + for dep in node.deps() { + stack.push(dep); + } + + while !stack.is_empty() { + let node_id = *stack.pop().unwrap(); + let node = get(node_id).unwrap(); + let node_id_start = node.dag_id_start(); + if !visited.contains(&node_id_start) { + vv.try_update_end(node_id); + for dep in node.deps() { + if !visited.contains(dep) { + stack.push(dep); + } + } + + visited.insert(node_id_start); + } + } + + vv +} + fn find_common_ancestor<'a, F, G>(get: &'a F, a_id: ID, b_id: ID, mut on_found: G) -> Option where F: Fn(ID) -> Option<&'a dyn DagNode>, diff --git a/crates/loro-core/src/dag/test.rs b/crates/loro-core/src/dag/test.rs index 1bf5d44b..e6cc9892 100644 --- a/crates/loro-core/src/dag/test.rs +++ b/crates/loro-core/src/dag/test.rs @@ -216,6 +216,35 @@ struct Interaction { len: usize, } +mod get_version_vector { + use super::*; + + #[test] + fn vv() { + let mut a = TestDag::new(0); + let mut b = TestDag::new(1); + a.push(1); + b.push(1); + a.merge(&b); + a.push(1); + let actual = a.get_vv(ID::new(0, 0)); + assert_eq!(actual, vec![ID::new(0, 0)].into()); + let actual = a.get_vv(ID::new(0, 1)); + assert_eq!(actual, vec![ID::new(0, 1), ID::new(1, 0)].into()); + + let mut c = TestDag::new(2); + c.merge(&a); + b.push(1); + c.merge(&b); + c.push(1); + let actual = c.get_vv(ID::new(2, 0)); + assert_eq!( + actual, + vec![ID::new(0, 1), ID::new(1, 1), ID::new(2, 0)].into() + ); + } +} + mod find_path { use super::*; diff --git a/crates/loro-core/src/version.rs b/crates/loro-core/src/version.rs index 153ac9e8..c3a4184e 100644 --- a/crates/loro-core/src/version.rs +++ b/crates/loro-core/src/version.rs @@ -1,8 +1,85 @@ +use std::{ + collections::HashMap, + ops::{Deref, DerefMut}, +}; + use fxhash::FxHashMap; -use crate::{change::Lamport, ClientID}; +use crate::{ + change::Lamport, + id::{Counter, ID}, + ClientID, +}; -pub type VersionVector = FxHashMap; +#[repr(transparent)] +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct VersionVector(FxHashMap); + +impl Deref for VersionVector { + type Target = FxHashMap; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for VersionVector { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl VersionVector { + #[inline] + pub fn new() -> Self { + Self(FxHashMap::default()) + } + + #[inline] + pub fn set_end(&mut self, id: ID) { + self.0.insert(id.client_id, id.counter + 1); + } + + /// update the end counter of the given client, if the end is greater + /// return whether updated + #[inline] + pub fn try_update_end(&mut self, id: ID) -> bool { + if let Some(end) = self.0.get_mut(&id.client_id) { + if *end < id.counter { + *end = id.counter + 1; + true + } else { + false + } + } else { + self.0.insert(id.client_id, id.counter + 1); + true + } + } +} + +impl Default for VersionVector { + fn default() -> Self { + Self::new() + } +} + +impl From> for VersionVector { + fn from(map: FxHashMap) -> Self { + Self(map) + } +} + +impl From> for VersionVector { + fn from(vec: Vec) -> Self { + let mut vv = VersionVector::new(); + for id in vec { + vv.set_end(id); + } + + vv + } +} #[derive(Debug, PartialEq, Eq, Clone, Copy, PartialOrd, Ord)] pub(crate) struct TotalOrderStamp { From b6d3f6b0b7f63d9ae5d777cd4bbead10b021fbeb Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Wed, 3 Aug 2022 22:08:13 +0800 Subject: [PATCH 09/16] feat: get missing span of vv --- crates/loro-core/src/span.rs | 18 +++++++++++++++--- crates/loro-core/src/version.rs | 16 ++++++++++++++++ 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/crates/loro-core/src/span.rs b/crates/loro-core/src/span.rs index e926f699..5f0cb2e9 100644 --- a/crates/loro-core/src/span.rs +++ b/crates/loro-core/src/span.rs @@ -1,14 +1,22 @@ +use std::fmt::Pointer; + use crate::id::{ClientID, Counter, ID}; use rle::{HasLength, Mergable, Slice, Sliceable}; /// [from, to) /// this is different from [std::ops::Range] because `from` may be greater than `to` -#[derive(Clone, Copy, PartialEq, Eq, Debug)] +#[derive(Clone, Copy, PartialEq, Eq)] pub struct CounterSpan { pub from: Counter, pub to: Counter, } +impl std::fmt::Debug for CounterSpan { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(format!("{}..{}", self.from, self.to).as_str()) + } +} + impl CounterSpan { #[inline] pub fn new(from: Counter, to: Counter) -> Self { @@ -116,6 +124,7 @@ impl Mergable for CounterSpan { } } +/// Span is always a left-closed right-open interval [from, to) #[derive(Clone, Copy, PartialEq, Eq)] pub struct IdSpan { pub client_id: ClientID, @@ -136,8 +145,11 @@ impl std::fmt::Debug for IdSpan { impl IdSpan { #[inline] - pub fn new(client_id: ClientID, counter: CounterSpan) -> Self { - IdSpan { client_id, counter } + pub fn new(client_id: ClientID, from: Counter, to: Counter) -> Self { + IdSpan { + client_id, + counter: CounterSpan::new(from, to), + } } #[inline] diff --git a/crates/loro-core/src/version.rs b/crates/loro-core/src/version.rs index c3a4184e..7f1e3513 100644 --- a/crates/loro-core/src/version.rs +++ b/crates/loro-core/src/version.rs @@ -8,6 +8,7 @@ use fxhash::FxHashMap; use crate::{ change::Lamport, id::{Counter, ID}, + span::IdSpan, ClientID, }; @@ -56,6 +57,21 @@ impl VersionVector { true } } + + pub fn get_missing_span(&self, target: &Self) -> Vec { + let mut ans = vec![]; + for (client_id, other_end) in target.iter() { + if let Some(my_end) = self.get(client_id) { + if my_end < other_end { + ans.push(IdSpan::new(*client_id, *my_end, *other_end)); + } + } else { + ans.push(IdSpan::new(*client_id, 0, *other_end)); + } + } + + ans + } } impl Default for VersionVector { From aa060a93da4519a6e3d9d703f629fe9323ef4df3 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Wed, 3 Aug 2022 22:21:00 +0800 Subject: [PATCH 10/16] feat: cmp vv --- crates/loro-core/src/dag/test.rs | 8 ++-- crates/loro-core/src/version.rs | 69 ++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 4 deletions(-) diff --git a/crates/loro-core/src/dag/test.rs b/crates/loro-core/src/dag/test.rs index e6cc9892..fd516069 100644 --- a/crates/loro-core/src/dag/test.rs +++ b/crates/loro-core/src/dag/test.rs @@ -275,8 +275,8 @@ mod find_path { assert_eq!( actual, Some(Path { - retreat: vec![IdSpan::new(0, CounterSpan::new(1, 0))], - forward: vec![IdSpan::new(1, CounterSpan::new(0, 1))], + retreat: vec![IdSpan::new(0, 1, 0)], + forward: vec![IdSpan::new(1, 0, 1)], }) ); } @@ -295,8 +295,8 @@ mod find_path { assert_eq!( actual, Some(Path { - retreat: vec![IdSpan::new(0, CounterSpan::new(3, 2))], - forward: vec![IdSpan::new(1, CounterSpan::new(1, 2))], + retreat: vec![IdSpan::new(0, 3, 2)], + forward: vec![IdSpan::new(1, 1, 2)], }) ); } diff --git a/crates/loro-core/src/version.rs b/crates/loro-core/src/version.rs index 7f1e3513..b36e6e3d 100644 --- a/crates/loro-core/src/version.rs +++ b/crates/loro-core/src/version.rs @@ -1,4 +1,5 @@ use std::{ + cmp::Ordering, collections::HashMap, ops::{Deref, DerefMut}, }; @@ -24,6 +25,48 @@ impl Deref for VersionVector { } } +impl PartialOrd for VersionVector { + fn partial_cmp(&self, other: &Self) -> Option { + let mut self_greater = true; + let mut other_greater = true; + let mut eq = true; + for (client_id, other_end) in other.iter() { + if let Some(self_end) = self.get(client_id) { + if self_end < other_end { + self_greater = false; + eq = false; + } + if self_end > other_end { + other_greater = false; + eq = false; + } + } else { + self_greater = false; + eq = false; + } + } + + for (client_id, _) in self.iter() { + if other.contains_key(client_id) { + continue; + } else { + other_greater = false; + eq = false; + } + } + + if eq { + Some(Ordering::Equal) + } else if self_greater { + Some(Ordering::Greater) + } else if other_greater { + Some(Ordering::Less) + } else { + None + } + } +} + impl DerefMut for VersionVector { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 @@ -102,3 +145,29 @@ pub(crate) struct TotalOrderStamp { pub(crate) lamport: Lamport, pub(crate) client_id: ClientID, } + +#[cfg(test)] +mod tests { + use super::*; + mod cmp { + use super::*; + #[test] + fn test() { + let a: VersionVector = vec![ID::new(1, 1), ID::new(2, 2)].into(); + let b: VersionVector = vec![ID::new(1, 1), ID::new(2, 2)].into(); + assert_eq!(a.partial_cmp(&b), Some(Ordering::Equal)); + + let a: VersionVector = vec![ID::new(1, 2), ID::new(2, 1)].into(); + let b: VersionVector = vec![ID::new(1, 1), ID::new(2, 2)].into(); + assert_eq!(a.partial_cmp(&b), None); + + let a: VersionVector = vec![ID::new(1, 2), ID::new(2, 3)].into(); + let b: VersionVector = vec![ID::new(1, 1), ID::new(2, 2)].into(); + assert_eq!(a.partial_cmp(&b), Some(Ordering::Greater)); + + let a: VersionVector = vec![ID::new(1, 0), ID::new(2, 2)].into(); + let b: VersionVector = vec![ID::new(1, 1), ID::new(2, 2)].into(); + assert_eq!(a.partial_cmp(&b), Some(Ordering::Less)); + } + } +} From 8a15d2e86308aed67149727864ab0e6a457f4597 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Wed, 3 Aug 2022 22:30:20 +0800 Subject: [PATCH 11/16] fix: type err --- crates/loro-framework/src/raw_store.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/loro-framework/src/raw_store.rs b/crates/loro-framework/src/raw_store.rs index ea01d912..7de5ebc5 100644 --- a/crates/loro-framework/src/raw_store.rs +++ b/crates/loro-framework/src/raw_store.rs @@ -1,5 +1,8 @@ use fxhash::FxHashMap; -use loro_core::{id::ClientID, version::VersionVector}; +use loro_core::{ + id::{ClientID, Counter}, + version::VersionVector, +}; use rle::RleVec; use crate::raw_change::{ChangeData, ChangeHash}; @@ -72,9 +75,9 @@ impl RawStore { } pub fn version_vector(&self) -> VersionVector { - let mut version_vector = FxHashMap::default(); + let mut version_vector = VersionVector::new(); for (client_id, changes) in &self.changes { - version_vector.insert(*client_id, changes.len() as u32); + version_vector.insert(*client_id, changes.len() as Counter); } version_vector From b8287837dc57f86b787a288a812fe2a26c871f7f Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Thu, 4 Aug 2022 02:42:00 +0800 Subject: [PATCH 12/16] feat: dag iter --- crates/loro-core/Cargo.toml | 1 + crates/loro-core/src/dag.rs | 20 +++++- crates/loro-core/src/dag/iter.rs | 112 +++++++++++++++++++++++++++++++ crates/loro-core/src/dag/test.rs | 43 +++++++++++- crates/loro-core/src/version.rs | 58 ++++++++++++++-- 5 files changed, 225 insertions(+), 9 deletions(-) create mode 100644 crates/loro-core/src/dag/iter.rs diff --git a/crates/loro-core/Cargo.toml b/crates/loro-core/Cargo.toml index bc5831da..7fad0352 100644 --- a/crates/loro-core/Cargo.toml +++ b/crates/loro-core/Cargo.toml @@ -16,6 +16,7 @@ moveit = "0.5.0" pin-project = "1.0.10" serde = {version = "1.0.140", features = ["derive"]} thiserror = "1.0.31" +im = "15.1.0" [dev-dependencies] proptest = "1.0.0" diff --git a/crates/loro-core/src/dag.rs b/crates/loro-core/src/dag.rs index 614a88d6..d2ac774e 100644 --- a/crates/loro-core/src/dag.rs +++ b/crates/loro-core/src/dag.rs @@ -4,6 +4,7 @@ use std::{ }; use fxhash::{FxHashMap, FxHashSet}; +mod iter; #[cfg(test)] mod test; @@ -14,6 +15,8 @@ use crate::{ version::VersionVector, }; +use self::iter::{iter_dag, DagIterator}; + pub trait DagNode { fn dag_id_start(&self) -> ID; fn lamport_start(&self) -> Lamport; @@ -64,13 +67,21 @@ fn reverse_path(path: &mut Vec) { } } +/// We have following invariance in DAG +/// - All deps' lamports are smaller than current node's lamport pub(crate) trait Dag { type Node: DagNode; fn get(&self, id: ID) -> Option<&Self::Node>; - fn contains(&self, id: ID) -> bool; + + #[inline] + fn contains(&self, id: ID) -> bool { + self.vv().includes(id) + } + fn frontier(&self) -> &[ID]; fn roots(&self) -> Vec<&Self::Node>; + fn vv(&self) -> VersionVector; // // TODO: Maybe use Result return type @@ -162,6 +173,13 @@ pub(crate) trait Dag { ans } + + fn iter(&self) -> DagIterator<'_, Self::Node> + where + Self: Sized, + { + iter_dag(self) + } } fn get_version_vector<'a, Get>(get: &'a Get, id: ID) -> VersionVector diff --git a/crates/loro-core/src/dag/iter.rs b/crates/loro-core/src/dag/iter.rs new file mode 100644 index 00000000..acfe2b2a --- /dev/null +++ b/crates/loro-core/src/dag/iter.rs @@ -0,0 +1,112 @@ +use super::*; + +#[derive(Debug, Clone, PartialEq, Eq)] +struct IdHeapItem { + id: ID, + lamport: Lamport, +} + +impl PartialOrd for IdHeapItem { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.lamport.cmp(&other.lamport).reverse()) + } +} + +impl Ord for IdHeapItem { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.lamport.cmp(&other.lamport).reverse() + } +} + +pub(crate) fn iter_dag(dag: &dyn Dag) -> DagIterator<'_, T> { + DagIterator { + dag, + vv_map: Default::default(), + visited: VersionVector::new(), + heap: BinaryHeap::new(), + } +} + +pub(crate) struct DagIterator<'a, T> { + dag: &'a dyn Dag, + /// we should keep every nodes starting id inside this map + vv_map: FxHashMap, + /// Because all deps' lamports are smaller than current node's lamport. + /// We can use the lamport to sort the nodes so that each node's deps are processed before itself. + /// + /// The ids in this heap are start ids of nodes. It won't be a id pointing to the middle of a node. + heap: BinaryHeap, + visited: VersionVector, +} + +// TODO: Need benchmark on memory +impl<'a, T: DagNode> Iterator for DagIterator<'a, T> { + type Item = (&'a T, VersionVector); + + fn next(&mut self) -> Option { + if self.vv_map.is_empty() { + if self.dag.vv().len() == 0 { + return None; + } + + for (&client_id, _) in self.dag.vv().iter() { + let vv = VersionVector::new(); + if let Some(node) = self.dag.get(ID::new(client_id, 0)) { + if node.lamport_start() == 0 { + self.vv_map.insert(ID::new(client_id, 0), vv.clone()); + } + + self.heap.push(IdHeapItem { + id: ID::new(client_id, 0), + lamport: node.lamport_start(), + }); + } + + self.visited.insert(client_id, 0); + } + } + + if !self.heap.is_empty() { + let item = self.heap.pop().unwrap(); + let id = item.id; + let node = self.dag.get(id).unwrap(); + debug_assert_eq!(id, node.dag_id_start()); + let mut vv = { + // calculate vv + let mut vv = None; + for &dep_id in node.deps() { + let dep = self.dag.get(dep_id).unwrap(); + let dep_vv = self.vv_map.get(&dep.dag_id_start()).unwrap(); + if vv.is_none() { + vv = Some(dep_vv.clone()); + } else { + vv.as_mut().unwrap().merge(dep_vv); + } + + if dep.dag_id_start() != dep_id { + vv.as_mut().unwrap().set_end(dep_id); + } + } + + vv.unwrap_or_else(VersionVector::new) + }; + + vv.try_update_end(id); + self.vv_map.insert(node.dag_id_start(), vv.clone()); + + // push next node from the same client to the heap + let next_id = id.inc(node.len() as i32); + if self.dag.contains(next_id) { + let next_node = self.dag.get(next_id).unwrap(); + self.heap.push(IdHeapItem { + id: next_id, + lamport: next_node.lamport_start(), + }); + } + + return Some((node, vv)); + } + + None + } +} diff --git a/crates/loro-core/src/dag/test.rs b/crates/loro-core/src/dag/test.rs index fd516069..69adb650 100644 --- a/crates/loro-core/src/dag/test.rs +++ b/crates/loro-core/src/dag/test.rs @@ -49,7 +49,7 @@ impl DagNode for TestNode { struct TestDag { nodes: FxHashMap>, frontier: Vec, - version_vec: FxHashMap, + version_vec: VersionVector, next_lamport: Lamport, client_id: ClientID, } @@ -77,6 +77,10 @@ impl Dag for TestDag { .and_then(|x| if *x > id.counter { Some(()) } else { None }) .is_some() } + + fn vv(&self) -> VersionVector { + self.version_vec.clone() + } } impl TestDag { @@ -84,7 +88,7 @@ impl TestDag { Self { nodes: FxHashMap::default(), frontier: Vec::new(), - version_vec: FxHashMap::default(), + version_vec: VersionVector::new(), next_lamport: 0, client_id, } @@ -216,6 +220,41 @@ struct Interaction { len: usize, } +mod iter { + use super::*; + + #[test] + fn test() { + let mut a = TestDag::new(0); + let mut b = TestDag::new(1); + // 0-0 + a.push(1); + // 1-0 + b.push(1); + a.merge(&b); + // 0-1 + a.push(1); + b.merge(&a); + // 1-1 + b.push(1); + a.merge(&b); + // 0-2 + a.push(1); + + let mut count = 0; + for (node, vv) in a.iter() { + count += 1; + if node.id == ID::new(0, 0) { + assert_eq!(vv, vec![ID::new(0, 0)].into()); + } else if node.id == ID::new(0, 2) { + assert_eq!(vv, vec![ID::new(0, 2), ID::new(1, 1)].into()); + } + } + + assert_eq!(count, 5); + } +} + mod get_version_vector { use super::*; diff --git a/crates/loro-core/src/version.rs b/crates/loro-core/src/version.rs index b36e6e3d..83005a0e 100644 --- a/crates/loro-core/src/version.rs +++ b/crates/loro-core/src/version.rs @@ -4,7 +4,8 @@ use std::{ ops::{Deref, DerefMut}, }; -use fxhash::FxHashMap; +use fxhash::{FxBuildHasher, FxHashMap}; +use im::hashmap::HashMap as ImHashMap; use crate::{ change::Lamport, @@ -13,12 +14,18 @@ use crate::{ ClientID, }; +/// It's a immutable hash map with O(1) clone. Because +/// - we want a cheap clone op on vv; +/// - neighbor op's VersionVectors are very similar, most of the memory can be shared in +/// immutable hashmap +/// +/// see also [im]. #[repr(transparent)] #[derive(Debug, PartialEq, Eq, Clone)] -pub struct VersionVector(FxHashMap); +pub struct VersionVector(ImHashMap); impl Deref for VersionVector { - type Target = FxHashMap; + type Target = ImHashMap; fn deref(&self) -> &Self::Target { &self.0 @@ -76,7 +83,7 @@ impl DerefMut for VersionVector { impl VersionVector { #[inline] pub fn new() -> Self { - Self(FxHashMap::default()) + Self(ImHashMap::new()) } #[inline] @@ -89,7 +96,7 @@ impl VersionVector { #[inline] pub fn try_update_end(&mut self, id: ID) -> bool { if let Some(end) = self.0.get_mut(&id.client_id) { - if *end < id.counter { + if *end < id.counter + 1 { *end = id.counter + 1; true } else { @@ -115,6 +122,27 @@ impl VersionVector { ans } + + pub fn merge(&mut self, other: &Self) { + for (&client_id, &other_end) in other.iter() { + if let Some(my_end) = self.get_mut(&client_id) { + if *my_end < other_end { + *my_end = other_end; + } + } else { + self.0.insert(client_id, other_end); + } + } + } + + pub fn includes(&mut self, id: ID) -> bool { + if let Some(end) = self.get_mut(&id.client_id) { + if *end > id.counter { + return true; + } + } + false + } } impl Default for VersionVector { @@ -125,7 +153,11 @@ impl Default for VersionVector { impl From> for VersionVector { fn from(map: FxHashMap) -> Self { - Self(map) + let mut im_map = ImHashMap::new(); + for (client_id, counter) in map { + im_map.insert(client_id, counter); + } + Self(im_map) } } @@ -170,4 +202,18 @@ mod tests { assert_eq!(a.partial_cmp(&b), Some(Ordering::Less)); } } + + #[test] + fn im() { + let mut a = VersionVector::new(); + a.set_end(ID::new(1, 1)); + a.set_end(ID::new(2, 1)); + let mut b = a.clone(); + b.merge(&vec![ID::new(1, 2), ID::new(2, 2)].into()); + assert!(a != b); + assert_eq!(a.get(&1), Some(&2)); + assert_eq!(a.get(&2), Some(&2)); + assert_eq!(b.get(&1), Some(&3)); + assert_eq!(b.get(&2), Some(&3)); + } } From 77065bf57ee3af4d44a063615fe9eda36d2829cb Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Thu, 4 Aug 2022 12:51:28 +0800 Subject: [PATCH 13/16] feat: mermaid --- crates/loro-core/Cargo.toml | 1 + crates/loro-core/src/dag.rs | 42 ++++++--- crates/loro-core/src/dag/iter.rs | 79 ++++++++++++++-- crates/loro-core/src/dag/mermaid.rs | 135 ++++++++++++++++++++++++++++ crates/loro-core/src/dag/test.rs | 45 +++++++++- 5 files changed, 282 insertions(+), 20 deletions(-) create mode 100644 crates/loro-core/src/dag/mermaid.rs diff --git a/crates/loro-core/Cargo.toml b/crates/loro-core/Cargo.toml index 7fad0352..c2cc1374 100644 --- a/crates/loro-core/Cargo.toml +++ b/crates/loro-core/Cargo.toml @@ -21,6 +21,7 @@ im = "15.1.0" [dev-dependencies] proptest = "1.0.0" proptest-derive = "0.3.0" +rand = "0.8.5" # See https://matklad.github.io/2021/02/27/delete-cargo-integration-tests.html [lib] diff --git a/crates/loro-core/src/dag.rs b/crates/loro-core/src/dag.rs index d2ac774e..cdd3fc66 100644 --- a/crates/loro-core/src/dag.rs +++ b/crates/loro-core/src/dag.rs @@ -5,6 +5,7 @@ use std::{ use fxhash::{FxHashMap, FxHashSet}; mod iter; +mod mermaid; #[cfg(test)] mod test; @@ -15,10 +16,13 @@ use crate::{ version::VersionVector, }; -use self::iter::{iter_dag, DagIterator}; +use self::{ + iter::{iter_dag, iter_dag_with_vv, DagIterator, DagIteratorVV}, + mermaid::dag_to_mermaid, +}; pub trait DagNode { - fn dag_id_start(&self) -> ID; + fn id_start(&self) -> ID; fn lamport_start(&self) -> Lamport; fn len(&self) -> usize; fn deps(&self) -> &Vec; @@ -29,8 +33,8 @@ pub trait DagNode { } #[inline] - fn dag_id_span(&self) -> IdSpan { - let id = self.dag_id_start(); + fn id_span(&self) -> IdSpan { + let id = self.id_start(); IdSpan { client_id: id.client_id, counter: CounterSpan::new(id.counter, id.counter + self.len() as Counter), @@ -39,8 +43,8 @@ pub trait DagNode { /// inclusive end #[inline] - fn dag_id_end(&self) -> ID { - let id = self.dag_id_start(); + fn id_end(&self) -> ID { + let id = self.id_start(); ID { client_id: id.client_id, counter: id.counter + self.len() as Counter - 1, @@ -49,7 +53,7 @@ pub trait DagNode { #[inline] fn get_lamport_from_counter(&self, c: Counter) -> Lamport { - self.lamport_start() + c as Lamport - self.dag_id_start().counter as Lamport + self.lamport_start() + c as Lamport - self.id_start().counter as Lamport } } @@ -174,12 +178,26 @@ pub(crate) trait Dag { ans } + fn iter_with_vv(&self) -> DagIteratorVV<'_, Self::Node> + where + Self: Sized, + { + iter_dag_with_vv(self) + } + fn iter(&self) -> DagIterator<'_, Self::Node> where Self: Sized, { iter_dag(self) } + + fn mermaid(&self) -> String + where + Self: Sized, + { + dag_to_mermaid(self) + } } fn get_version_vector<'a, Get>(get: &'a Get, id: ID) -> VersionVector @@ -203,7 +221,7 @@ where while !stack.is_empty() { let node_id = *stack.pop().unwrap(); let node = get(node_id).unwrap(); - let node_id_start = node.dag_id_start(); + let node_id_start = node.id_start(); if !visited.contains(&node_id_start) { vv.try_update_end(node_id); for dep in node.deps() { @@ -219,6 +237,10 @@ where vv } +// fn mermaid(dag: &impl Dag) -> String { +// dag +// } + fn find_common_ancestor<'a, F, G>(get: &'a F, a_id: ID, b_id: ID, mut on_found: G) -> Option where F: Fn(ID) -> Option<&'a dyn DagNode>, @@ -346,8 +368,8 @@ where deps: dep.deps(), }); a_path.insert(*dep_id, a.id); - if dep.dag_id_start() != *dep_id { - a_path.insert(dep.dag_id_start(), *dep_id); + if dep.id_start() != *dep_id { + a_path.insert(dep.id_start(), *dep_id); } if let Some(v) = a_vv.get_mut(&dep_id.client_id) { diff --git a/crates/loro-core/src/dag/iter.rs b/crates/loro-core/src/dag/iter.rs index acfe2b2a..63837c09 100644 --- a/crates/loro-core/src/dag/iter.rs +++ b/crates/loro-core/src/dag/iter.rs @@ -18,8 +18,8 @@ impl Ord for IdHeapItem { } } -pub(crate) fn iter_dag(dag: &dyn Dag) -> DagIterator<'_, T> { - DagIterator { +pub(crate) fn iter_dag_with_vv(dag: &dyn Dag) -> DagIteratorVV<'_, T> { + DagIteratorVV { dag, vv_map: Default::default(), visited: VersionVector::new(), @@ -27,7 +27,70 @@ pub(crate) fn iter_dag(dag: &dyn Dag) -> DagIterator<'_, T> { } } -pub(crate) struct DagIterator<'a, T> { +pub(crate) fn iter_dag(dag: &dyn Dag) -> DagIterator<'_, T> { + DagIterator { + dag, + visited: VersionVector::new(), + heap: BinaryHeap::new(), + } +} + +pub struct DagIterator<'a, T> { + dag: &'a dyn Dag, + /// Because all deps' lamports are smaller than current node's lamport. + /// We can use the lamport to sort the nodes so that each node's deps are processed before itself. + /// + /// The ids in this heap are start ids of nodes. It won't be a id pointing to the middle of a node. + heap: BinaryHeap, + visited: VersionVector, +} + +// TODO: Need benchmark on memory +impl<'a, T: DagNode> Iterator for DagIterator<'a, T> { + type Item = &'a T; + + fn next(&mut self) -> Option { + if self.visited.is_empty() { + if self.dag.vv().len() == 0 { + return None; + } + + for (&client_id, _) in self.dag.vv().iter() { + if let Some(node) = self.dag.get(ID::new(client_id, 0)) { + self.heap.push(IdHeapItem { + id: ID::new(client_id, 0), + lamport: node.lamport_start(), + }); + } + + self.visited.insert(client_id, 0); + } + } + + if !self.heap.is_empty() { + let item = self.heap.pop().unwrap(); + let id = item.id; + let node = self.dag.get(id).unwrap(); + debug_assert_eq!(id, node.id_start()); + + // push next node from the same client to the heap + let next_id = id.inc(node.len() as i32); + if self.dag.contains(next_id) { + let next_node = self.dag.get(next_id).unwrap(); + self.heap.push(IdHeapItem { + id: next_id, + lamport: next_node.lamport_start(), + }); + } + + return Some(node); + } + + None + } +} + +pub(crate) struct DagIteratorVV<'a, T> { dag: &'a dyn Dag, /// we should keep every nodes starting id inside this map vv_map: FxHashMap, @@ -40,7 +103,7 @@ pub(crate) struct DagIterator<'a, T> { } // TODO: Need benchmark on memory -impl<'a, T: DagNode> Iterator for DagIterator<'a, T> { +impl<'a, T: DagNode> Iterator for DagIteratorVV<'a, T> { type Item = (&'a T, VersionVector); fn next(&mut self) -> Option { @@ -70,20 +133,20 @@ impl<'a, T: DagNode> Iterator for DagIterator<'a, T> { let item = self.heap.pop().unwrap(); let id = item.id; let node = self.dag.get(id).unwrap(); - debug_assert_eq!(id, node.dag_id_start()); + debug_assert_eq!(id, node.id_start()); let mut vv = { // calculate vv let mut vv = None; for &dep_id in node.deps() { let dep = self.dag.get(dep_id).unwrap(); - let dep_vv = self.vv_map.get(&dep.dag_id_start()).unwrap(); + let dep_vv = self.vv_map.get(&dep.id_start()).unwrap(); if vv.is_none() { vv = Some(dep_vv.clone()); } else { vv.as_mut().unwrap().merge(dep_vv); } - if dep.dag_id_start() != dep_id { + if dep.id_start() != dep_id { vv.as_mut().unwrap().set_end(dep_id); } } @@ -92,7 +155,7 @@ impl<'a, T: DagNode> Iterator for DagIterator<'a, T> { }; vv.try_update_end(id); - self.vv_map.insert(node.dag_id_start(), vv.clone()); + self.vv_map.insert(node.id_start(), vv.clone()); // push next node from the same client to the heap let next_id = id.inc(node.len() as i32); diff --git a/crates/loro-core/src/dag/mermaid.rs b/crates/loro-core/src/dag/mermaid.rs new file mode 100644 index 00000000..c2799f99 --- /dev/null +++ b/crates/loro-core/src/dag/mermaid.rs @@ -0,0 +1,135 @@ +use rle::RleVec; + +use super::*; +struct BreakPoints { + vv: VersionVector, + break_points: FxHashMap>, + /// start ID to ID. The target ID may be in the middle of an op. + /// + /// only includes links across different clients + links: FxHashMap, +} + +struct Output { + clients: FxHashMap>, + /// start ID to start ID. + /// + /// only includes links across different clients + links: FxHashMap, +} + +fn to_str(output: Output) -> String { + let mut s = String::new(); + let mut indent_level = 0; + macro_rules! new_line { + () => { + s += "\n"; + for _ in 0..indent_level { + s += " "; + } + }; + } + s += "flowchart RL"; + indent_level += 1; + new_line!(); + for (client_id, spans) in output.clients.iter() { + s += format!("subgraph client{}", client_id).as_str(); + new_line!(); + let mut is_first = true; + for id_span in spans.iter().rev() { + if !is_first { + s += " --> " + } + + is_first = false; + s += format!( + "{}-{}(ctr: {}..{})", + id_span.client_id, id_span.counter.from, id_span.counter.from, id_span.counter.to + ) + .as_str(); + } + + new_line!(); + s += "end"; + new_line!(); + new_line!(); + } + + for (id_from, id_to) in output.links.iter() { + s += format!( + "{}-{} --> {}-{}", + id_from.client_id, id_from.counter, id_to.client_id, id_to.counter + ) + .as_str(); + new_line!(); + } + + s +} + +fn break_points_to_output(input: BreakPoints) -> Output { + let mut output = Output { + clients: FxHashMap::default(), + links: FxHashMap::default(), + }; + let breaks: FxHashMap> = input + .break_points + .into_iter() + .map(|(client_id, set)| { + let mut vec: Vec = set.iter().cloned().collect(); + vec.sort(); + (client_id, vec) + }) + .collect(); + for (client_id, break_points) in breaks.iter() { + let mut spans = vec![]; + for (from, to) in break_points.iter().zip(break_points.iter().skip(1)) { + spans.push(IdSpan::new(*client_id, *from, *to)); + } + output.clients.insert(*client_id, spans); + } + + for (id_from, id_to) in input.links.iter() { + let client_breaks = breaks.get(&id_to.client_id).unwrap(); + match client_breaks.binary_search(&id_to.counter) { + Ok(_) => { + output.links.insert(*id_from, *id_to); + } + Err(index) => { + output + .links + .insert(*id_from, ID::new(id_to.client_id, client_breaks[index - 1])); + } + } + } + output +} + +fn get_dag_break_points(dag: &impl Dag) -> BreakPoints { + let mut break_points = BreakPoints { + vv: dag.vv(), + break_points: FxHashMap::default(), + links: FxHashMap::default(), + }; + + for node in dag.iter() { + let id = node.id_start(); + let set = break_points.break_points.entry(id.client_id).or_default(); + set.insert(id.counter); + set.insert(id.counter + node.len() as Counter); + for dep in node.deps() { + break_points + .break_points + .entry(dep.client_id) + .or_default() + .insert(dep.counter); + break_points.links.insert(id, *dep); + } + } + + break_points +} + +pub(crate) fn dag_to_mermaid(dag: &impl Dag) -> String { + to_str(break_points_to_output(get_dag_break_points(dag))) +} diff --git a/crates/loro-core/src/dag/test.rs b/crates/loro-core/src/dag/test.rs index 69adb650..57c879d0 100644 --- a/crates/loro-core/src/dag/test.rs +++ b/crates/loro-core/src/dag/test.rs @@ -31,7 +31,7 @@ impl TestNode { } impl DagNode for TestNode { - fn dag_id_start(&self) -> ID { + fn id_start(&self) -> ID { self.id } fn lamport_start(&self) -> Lamport { @@ -242,7 +242,7 @@ mod iter { a.push(1); let mut count = 0; - for (node, vv) in a.iter() { + for (node, vv) in a.iter_with_vv() { count += 1; if node.id == ID::new(0, 0) { assert_eq!(vv, vec![ID::new(0, 0)].into()); @@ -255,6 +255,47 @@ mod iter { } } +mod mermaid { + use super::*; + + #[test] + fn simple() { + let mut a = TestDag::new(0); + let mut b = TestDag::new(1); + // 0-0 + a.push(1); + // 1-0 + b.push(1); + a.merge(&b); + // 0-1 + a.push(1); + b.merge(&a); + // 1-1 + b.push(1); + a.merge(&b); + // 0-2 + a.push(1); + + println!("{}", a.mermaid()); + } + + #[test] + fn three() { + let mut a = TestDag::new(0); + let mut b = TestDag::new(1); + let mut c = TestDag::new(2); + a.push(10); + b.merge(&a); + b.push(3); + c.merge(&b); + c.push(4); + a.merge(&c); + a.push(2); + b.merge(&a); + println!("{}", b.mermaid()); + } +} + mod get_version_vector { use super::*; From d1e135ec305ecca14b8efbf4971bc6eeded7f17c Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Thu, 4 Aug 2022 13:10:09 +0800 Subject: [PATCH 14/16] test: mermaid --- crates/loro-core/src/dag/mermaid.rs | 4 +++ crates/loro-core/src/dag/test.rs | 49 +++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/crates/loro-core/src/dag/mermaid.rs b/crates/loro-core/src/dag/mermaid.rs index c2799f99..d8c7685b 100644 --- a/crates/loro-core/src/dag/mermaid.rs +++ b/crates/loro-core/src/dag/mermaid.rs @@ -118,6 +118,10 @@ fn get_dag_break_points(dag: &impl Dag) -> BreakPoints { set.insert(id.counter); set.insert(id.counter + node.len() as Counter); for dep in node.deps() { + if dep.client_id == id.client_id { + continue; + } + break_points .break_points .entry(dep.client_id) diff --git a/crates/loro-core/src/dag/test.rs b/crates/loro-core/src/dag/test.rs index 57c879d0..75de0fd1 100644 --- a/crates/loro-core/src/dag/test.rs +++ b/crates/loro-core/src/dag/test.rs @@ -4,6 +4,7 @@ use proptest::proptest; use super::*; use crate::{ + array_mut_ref, change::Lamport, id::{ClientID, Counter, ID}, span::{CounterSpan, IdSpan}, @@ -220,6 +221,37 @@ struct Interaction { len: usize, } +impl Interaction { + fn gen(rng: &mut impl rand::Rng, num: usize) -> Self { + if rng.gen_bool(0.5) { + let dag_idx = rng.gen_range(0..num); + let merge_with = (rng.gen_range(1..num - 1) + dag_idx) % num; + Self { + dag_idx, + merge_with: Some(merge_with), + len: rng.gen_range(1..10), + } + } else { + Self { + dag_idx: rng.gen_range(0..num), + merge_with: None, + len: rng.gen_range(1..10), + } + } + } + + fn apply(&self, dags: &mut [TestDag]) { + if let Some(merge_with) = self.merge_with { + if merge_with != self.dag_idx { + let (from, to) = array_mut_ref!(dags, [self.dag_idx, merge_with]); + from.merge(to); + } + } + + dags[self.dag_idx].push(self.len); + } +} + mod iter { use super::*; @@ -256,6 +288,8 @@ mod iter { } mod mermaid { + use rle::Mergable; + use super::*; #[test] @@ -294,6 +328,21 @@ mod mermaid { b.merge(&a); println!("{}", b.mermaid()); } + + #[test] + fn gen() { + let num = 5; + let mut rng = rand::thread_rng(); + let mut dags = (0..num).map(TestDag::new).collect::>(); + for _ in 0..100 { + Interaction::gen(&mut rng, num as usize).apply(&mut dags); + } + for i in 1..num { + let (a, other) = array_mut_ref!(&mut dags, [0, i as usize]); + a.merge(other); + } + println!("{}", dags[0].mermaid()); + } } mod get_version_vector { From bb3eb7b7a077a017676c3959e80573883e3d3bf2 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Thu, 4 Aug 2022 13:21:49 +0800 Subject: [PATCH 15/16] fix: refine mermaid diagram style --- crates/loro-core/src/dag.rs | 2 ++ crates/loro-core/src/dag/mermaid.rs | 8 ++++++-- crates/loro-core/src/dag/test.rs | 2 +- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/crates/loro-core/src/dag.rs b/crates/loro-core/src/dag.rs index cdd3fc66..219d25ac 100644 --- a/crates/loro-core/src/dag.rs +++ b/crates/loro-core/src/dag.rs @@ -192,6 +192,8 @@ pub(crate) trait Dag { iter_dag(self) } + /// You can visualize and generate img link at https://mermaid.live/ + #[inline] fn mermaid(&self) -> String where Self: Sized, diff --git a/crates/loro-core/src/dag/mermaid.rs b/crates/loro-core/src/dag/mermaid.rs index d8c7685b..ea28606a 100644 --- a/crates/loro-core/src/dag/mermaid.rs +++ b/crates/loro-core/src/dag/mermaid.rs @@ -43,8 +43,12 @@ fn to_str(output: Output) -> String { is_first = false; s += format!( - "{}-{}(ctr: {}..{})", - id_span.client_id, id_span.counter.from, id_span.counter.from, id_span.counter.to + "{}-{}(\"c{}: [{}, {})\")", + id_span.client_id, + id_span.counter.from, + id_span.client_id, + id_span.counter.from, + id_span.counter.to ) .as_str(); } diff --git a/crates/loro-core/src/dag/test.rs b/crates/loro-core/src/dag/test.rs index 75de0fd1..42507ab0 100644 --- a/crates/loro-core/src/dag/test.rs +++ b/crates/loro-core/src/dag/test.rs @@ -473,7 +473,7 @@ mod find_common_ancestors { ); } - /// ![](https://i.ibb.co/C5xLG53/image.png) + /// ![](https://mermaid.ink/img/pako:eNqNkTFPwzAQhf_K6SYqOZJ9CYsHJroxwYgXY7skInEq1xFCVf87jg5XVQQSnk6fz_feO5_RzT6gxsM4f7repgzPTyZCOafl7T3ZYw9uHELMkqls2juDTmp4bQV0O4M7aJqHwqlyEtDecFW5EkA3XFYuBaiVs0CInotfXSimqunW16q87gTcX6cqdqe27hSrKVZr_6tGTImn0nYqcWbaZiZWI1ajP9WK2zqnClFd5jVn3SIKnEKa7ODLb53Xa4O5D1MwqEvpbfowaOKl9C1Hb3PY-yHPCfXBjqcg0C55fvmKDnVOS6hNj4Mtgaefrss3dp6HFg) #[test] fn large_lamport_with_longer_path() { let mut a0 = TestDag::new(0); From d3abb895f26bb1588ad50afa34d1aaa20a69ad59 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Thu, 4 Aug 2022 15:26:38 +0800 Subject: [PATCH 16/16] docs: update doc --- crates/loro-core/src/change.rs | 6 ++++++ crates/loro-core/src/dag.rs | 12 +++++++++++- crates/loro-core/src/lib.rs | 6 ++++-- crates/loro-core/src/log_store.rs | 5 +++++ crates/loro-core/src/version.rs | 8 +++++++- crates/rle/src/lib.rs | 20 ++++++++++++++++++++ 6 files changed, 53 insertions(+), 4 deletions(-) diff --git a/crates/loro-core/src/change.rs b/crates/loro-core/src/change.rs index d85d9765..771b3749 100644 --- a/crates/loro-core/src/change.rs +++ b/crates/loro-core/src/change.rs @@ -1,3 +1,9 @@ +//! [Change]s are merged ops. +//! +//! Every [Change] has deps on other [Change]s. All [Change]s in the document thus form a DAG. +//! Note, `dep` may point to the middle of the other [Change]. +//! +//! In future, we may also use [Change] to represent a transaction. But this decision is postponed. use std::char::MAX; use crate::{ diff --git a/crates/loro-core/src/dag.rs b/crates/loro-core/src/dag.rs index 219d25ac..33e45282 100644 --- a/crates/loro-core/src/dag.rs +++ b/crates/loro-core/src/dag.rs @@ -1,3 +1,11 @@ +//! DAG (Directed Acyclic Graph) is a common data structure in distributed system. +//! +//! This mod contains the DAGs in our CRDT. It's not a general DAG, it has some specific properties that +//! we used to optimize the speed: +//! - Each node has lamport clock. +//! - Each node has its ID (client_id, counter). +//! - We use ID to refer to node rather than content addressing (hash) +//! use std::{ collections::{BinaryHeap, HashMap, HashSet, VecDeque}, ops::Range, @@ -21,7 +29,7 @@ use self::{ mermaid::dag_to_mermaid, }; -pub trait DagNode { +pub(crate) trait DagNode { fn id_start(&self) -> ID; fn lamport_start(&self) -> Lamport; fn len(&self) -> usize; @@ -71,6 +79,8 @@ fn reverse_path(path: &mut Vec) { } } +/// Dag (Directed Acyclic Graph). +/// /// We have following invariance in DAG /// - All deps' lamports are smaller than current node's lamport pub(crate) trait Dag { diff --git a/crates/loro-core/src/lib.rs b/crates/loro-core/src/lib.rs index 6d24dede..b695e772 100644 --- a/crates/loro-core/src/lib.rs +++ b/crates/loro-core/src/lib.rs @@ -1,4 +1,5 @@ -//! # Loro +//! loro-core is a CRDT framework. +//! //! //! #![allow(dead_code, unused_imports)] @@ -8,11 +9,11 @@ pub mod configure; pub mod container; pub mod dag; pub mod id; +pub mod log_store; pub mod op; pub mod version; mod error; -mod log_store; mod loro; mod smstring; mod snapshot; @@ -32,5 +33,6 @@ pub use container::ContainerType; pub use log_store::LogStore; pub use loro::*; pub use value::LoroValue; +pub use version::VersionVector; use string_cache::DefaultAtom; diff --git a/crates/loro-core/src/log_store.rs b/crates/loro-core/src/log_store.rs index 453e9388..3f646e17 100644 --- a/crates/loro-core/src/log_store.rs +++ b/crates/loro-core/src/log_store.rs @@ -1,3 +1,6 @@ +//! [LogStore] stores all the [Change]s and [Op]s. It's also a [DAG][crate::dag]; +//! +//! mod iter; use pin_project::pin_project; use std::{collections::BinaryHeap, marker::PhantomPinned, pin::Pin, ptr::NonNull}; @@ -38,6 +41,8 @@ impl Default for GcConfig { /// Entry of the loro inner state. /// This is a self-referential structure. So it need to be pinned. +/// +/// `frontier`s are the Changes without children in the DAG (there is no dep pointing to them) #[pin_project] pub struct LogStore { changes: FxHashMap>, diff --git a/crates/loro-core/src/version.rs b/crates/loro-core/src/version.rs index 83005a0e..7cd216f2 100644 --- a/crates/loro-core/src/version.rs +++ b/crates/loro-core/src/version.rs @@ -14,7 +14,13 @@ use crate::{ ClientID, }; -/// It's a immutable hash map with O(1) clone. Because +/// [VersionVector](https://en.wikipedia.org/wiki/Version_vector) +/// +/// It's a map from [ClientID] to [Counter]. Its a right-open interval. +/// i.e. a [VersionVector] of `{A: 1, B: 2}` means that A has 1 atomic op and B has 2 atomic ops, +/// thus ID of `{client: A, counter: 1}` is out of the range. +/// +/// In implementation, it's a immutable hash map with O(1) clone. Because /// - we want a cheap clone op on vv; /// - neighbor op's VersionVectors are very similar, most of the memory can be shared in /// immutable hashmap diff --git a/crates/rle/src/lib.rs b/crates/rle/src/lib.rs index 368ecab7..f83e87b1 100644 --- a/crates/rle/src/lib.rs +++ b/crates/rle/src/lib.rs @@ -1,3 +1,23 @@ +//! Run length encoding library. +//! +//! There are many mergeable types. By merging them together we can get a more compact representation of the data. +//! For example, in many cases, `[0..5, 5..10]` can be merged into `0..10`. +//! +//! # RleVec +//! +//! RleVec is a vector that can be compressed using run-length encoding. +//! +//! A T value may be merged with its neighbors. When we push new element, the new value +//! may be merged with the last element in the array. Each value has a length, so there +//! are two types of indexes: +//! 1. (merged) It refers to the index of the merged element. +//! 2. (atom) The index of substantial elements. It refers to the index of the atom element. +//! +//! By default, we use atom index in RleVec. +//! - len() returns the number of atom elements in the array. +//! - get(index) returns the atom element at the index. +//! - slice(from, to) returns a slice of atom elements from the index from to the index to. +//! mod rle_trait; mod rle_vec; pub use crate::rle_trait::{HasLength, Mergable, Rle, Slice, Sliceable};