From 14178eeb2ca9a2a8e7b8859baf23e754acba0203 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Fri, 16 Aug 2024 15:29:51 +0800 Subject: [PATCH] refactor: move handle new change inside loro_dag --- crates/loro-internal/src/oplog.rs | 80 +----------------- crates/loro-internal/src/oplog/loro_dag.rs | 97 +++++++++++++++++++++- package.json | 2 +- 3 files changed, 98 insertions(+), 81 deletions(-) diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index ed766871..b8560578 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -31,6 +31,7 @@ use rle::{HasLength, Mergable, RleVec, Sliceable}; use smallvec::SmallVec; use self::iter::MergedChangeIter; +use self::loro_dag::EnsureDagNodeDepsAreAtTheEnd; pub use self::loro_dag::{AppDag, AppDagNode, FrontiersNotIncluded}; use self::pending_changes::PendingChanges; @@ -72,8 +73,6 @@ impl std::fmt::Debug for OpLog { } } -pub(crate) struct EnsureDagNodeDepsAreAtTheEnd; - impl OpLog { #[inline] pub(crate) fn new() -> Self { @@ -230,82 +229,7 @@ impl OpLog { &mut self, change: &Change, ) -> EnsureDagNodeDepsAreAtTheEnd { - let len = change.content_len(); - self.dag.update_frontiers(change.id_last(), &change.deps); - if change.deps_on_self() { - // don't need to push new element to dag because it only depends on itself - self.dag.with_last_mut_of_peer(change.id.peer, |last| { - let last = last.unwrap(); - assert_eq!(last.peer, change.id.peer, "peer id is not the same"); - assert_eq!( - last.cnt + last.len as Counter, - change.id.counter, - "counter is not continuous" - ); - assert_eq!( - last.lamport + last.len as Lamport, - change.lamport, - "lamport is not continuous" - ); - last.len = (change.id.counter - last.cnt) as usize + len; - last.has_succ = false; - }); - } else { - let vv = self.dag.frontiers_to_im_vv(&change.deps); - let mut pushed = false; - let node = AppDagNode { - vv: OnceCell::from(vv), - peer: change.id.peer, - cnt: change.id.counter, - lamport: change.lamport, - deps: change.deps.clone(), - has_succ: false, - len, - }; - self.dag.with_last_mut_of_peer(change.id.peer, |last| { - if let Some(last) = last { - if change.id.counter > 0 { - assert_eq!( - last.ctr_end(), - change.id.counter, - "counter is not continuous" - ); - } - - if last.is_mergable(&node, &()) { - pushed = true; - last.merge(&node, &()); - } - } - }); - - if !pushed { - self.dag.insert(node.id_start(), node); - } - - for dep in change.deps.iter() { - let ans = self.dag.with_node_mut(*dep, |target| { - let target = target.unwrap(); - if target.ctr_last() == dep.counter { - target.has_succ = true; - None - } else { - // We need to split the target node into two part - // so that we can ensure the new change depends on the - // last id of a dag node. - let new_node = - target.slice(dep.counter as usize - target.cnt as usize, target.len); - target.len -= new_node.len; - Some(new_node) - } - }); - if let Some(new_node) = ans { - self.dag.insert(new_node.id_start(), new_node); - } - } - } - - EnsureDagNodeDepsAreAtTheEnd + self.dag.handle_new_change(change) } /// Trim the known part of change diff --git a/crates/loro-internal/src/oplog/loro_dag.rs b/crates/loro-internal/src/oplog/loro_dag.rs index bbcd3350..8a9a7cef 100644 --- a/crates/loro-internal/src/oplog/loro_dag.rs +++ b/crates/loro-internal/src/oplog/loro_dag.rs @@ -7,8 +7,9 @@ use loro_common::{HasCounter, HasCounterSpan, HasIdSpan, PeerID}; use once_cell::sync::OnceCell; use rle::{HasIndex, HasLength, Mergable, Sliceable}; use std::cmp::Ordering; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use std::fmt::Display; +use std::marker::PhantomData; use std::sync::Mutex; use super::ChangeStore; @@ -33,6 +34,14 @@ pub struct AppDag { /// /// - `vv` >= `unparsed_vv` unparsed_vv: VersionVector, + /// It's a set of points which are deps of some parsed ops. + /// But the ops in this set are not parsed yet. When they are parsed, + /// we need to make sure it breaks at the given point. + unhandled_dep_points: BTreeSet, +} + +pub(crate) struct EnsureDagNodeDepsAreAtTheEnd { + _private: PhantomData<()>, } #[derive(Debug, Clone)] @@ -56,6 +65,7 @@ impl AppDag { frontiers: Frontiers::default(), vv: VersionVector::default(), unparsed_vv: VersionVector::default(), + unhandled_dep_points: BTreeSet::new(), } } @@ -71,7 +81,89 @@ impl AppDag { self.vv.is_empty() } - pub fn insert(&self, id: ID, node: AppDagNode) { + pub(super) fn handle_new_change(&mut self, change: &Change) -> EnsureDagNodeDepsAreAtTheEnd { + let len = change.content_len(); + self.update_frontiers(change.id_last(), &change.deps); + if change.deps_on_self() { + // don't need to push new element to dag because it only depends on itself + self.with_last_mut_of_peer(change.id.peer, |last| { + let last = last.unwrap(); + assert_eq!(last.peer, change.id.peer, "peer id is not the same"); + assert_eq!( + last.cnt + last.len as Counter, + change.id.counter, + "counter is not continuous" + ); + assert_eq!( + last.lamport + last.len as Lamport, + change.lamport, + "lamport is not continuous" + ); + last.len = (change.id.counter - last.cnt) as usize + len; + last.has_succ = false; + }); + } else { + let vv = self.frontiers_to_im_vv(&change.deps); + let mut pushed = false; + let node = AppDagNode { + vv: OnceCell::from(vv), + peer: change.id.peer, + cnt: change.id.counter, + lamport: change.lamport, + deps: change.deps.clone(), + has_succ: false, + len, + }; + + self.with_last_mut_of_peer(change.id.peer, |last| { + if let Some(last) = last { + if change.id.counter > 0 { + assert_eq!( + last.ctr_end(), + change.id.counter, + "counter is not continuous" + ); + } + + if last.is_mergable(&node, &()) { + pushed = true; + last.merge(&node, &()); + } + } + }); + + if !pushed { + self.insert(node.id_start(), node); + } + + for dep in change.deps.iter() { + let ans = self.with_node_mut(*dep, |target| { + let target = target.unwrap(); + if target.ctr_last() == dep.counter { + target.has_succ = true; + None + } else { + // We need to split the target node into two part + // so that we can ensure the new change depends on the + // last id of a dag node. + let new_node = + target.slice(dep.counter as usize - target.cnt as usize, target.len); + target.len -= new_node.len; + Some(new_node) + } + }); + if let Some(new_node) = ans { + self.insert(new_node.id_start(), new_node); + } + } + } + + EnsureDagNodeDepsAreAtTheEnd { + _private: PhantomData, + } + } + + fn insert(&self, id: ID, node: AppDagNode) { self.map.lock().unwrap().insert(id, node); } @@ -163,6 +255,7 @@ impl AppDag { frontiers: self.frontiers.clone(), vv: self.vv.clone(), unparsed_vv: self.unparsed_vv.clone(), + unhandled_dep_points: self.unhandled_dep_points.clone(), } } diff --git a/package.json b/package.json index 7d9f888f..23b10ebf 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,7 @@ "check-all": "cargo hack check --each-feature", "build": "cargo build", "test": "cargo nextest run --features=test_utils && cargo test --doc", - "test-all": "deno task test & deno task test-wasm", + "test-all": "nr test && nr test-wasm", "test-wasm": "cd crates/loro-wasm && deno task dev && cd ../../loro-js && pnpm i && pnpm run test", "coverage": "mkdir -p coverage && cargo llvm-cov nextest --features test_utils --lcov > coverage/lcov-nextest.info && cargo llvm-cov report", "release-wasm": "cd crates/loro-wasm && deno task release && cd ../../loro-js && pnpm i && pnpm build && pnpm run test",