refactor: move handle new change inside loro_dag

This commit is contained in:
Zixuan Chen 2024-08-16 15:29:51 +08:00
parent 7a460414d4
commit 14178eeb2c
No known key found for this signature in database
3 changed files with 98 additions and 81 deletions

View file

@ -31,6 +31,7 @@ use rle::{HasLength, Mergable, RleVec, Sliceable};
use smallvec::SmallVec;
use self::iter::MergedChangeIter;
use self::loro_dag::EnsureDagNodeDepsAreAtTheEnd;
pub use self::loro_dag::{AppDag, AppDagNode, FrontiersNotIncluded};
use self::pending_changes::PendingChanges;
@ -72,8 +73,6 @@ impl std::fmt::Debug for OpLog {
}
}
pub(crate) struct EnsureDagNodeDepsAreAtTheEnd;
impl OpLog {
#[inline]
pub(crate) fn new() -> Self {
@ -230,82 +229,7 @@ impl OpLog {
&mut self,
change: &Change,
) -> EnsureDagNodeDepsAreAtTheEnd {
let len = change.content_len();
self.dag.update_frontiers(change.id_last(), &change.deps);
if change.deps_on_self() {
// don't need to push new element to dag because it only depends on itself
self.dag.with_last_mut_of_peer(change.id.peer, |last| {
let last = last.unwrap();
assert_eq!(last.peer, change.id.peer, "peer id is not the same");
assert_eq!(
last.cnt + last.len as Counter,
change.id.counter,
"counter is not continuous"
);
assert_eq!(
last.lamport + last.len as Lamport,
change.lamport,
"lamport is not continuous"
);
last.len = (change.id.counter - last.cnt) as usize + len;
last.has_succ = false;
});
} else {
let vv = self.dag.frontiers_to_im_vv(&change.deps);
let mut pushed = false;
let node = AppDagNode {
vv: OnceCell::from(vv),
peer: change.id.peer,
cnt: change.id.counter,
lamport: change.lamport,
deps: change.deps.clone(),
has_succ: false,
len,
};
self.dag.with_last_mut_of_peer(change.id.peer, |last| {
if let Some(last) = last {
if change.id.counter > 0 {
assert_eq!(
last.ctr_end(),
change.id.counter,
"counter is not continuous"
);
}
if last.is_mergable(&node, &()) {
pushed = true;
last.merge(&node, &());
}
}
});
if !pushed {
self.dag.insert(node.id_start(), node);
}
for dep in change.deps.iter() {
let ans = self.dag.with_node_mut(*dep, |target| {
let target = target.unwrap();
if target.ctr_last() == dep.counter {
target.has_succ = true;
None
} else {
// We need to split the target node into two part
// so that we can ensure the new change depends on the
// last id of a dag node.
let new_node =
target.slice(dep.counter as usize - target.cnt as usize, target.len);
target.len -= new_node.len;
Some(new_node)
}
});
if let Some(new_node) = ans {
self.dag.insert(new_node.id_start(), new_node);
}
}
}
EnsureDagNodeDepsAreAtTheEnd
self.dag.handle_new_change(change)
}
/// Trim the known part of change

View file

@ -7,8 +7,9 @@ use loro_common::{HasCounter, HasCounterSpan, HasIdSpan, PeerID};
use once_cell::sync::OnceCell;
use rle::{HasIndex, HasLength, Mergable, Sliceable};
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Display;
use std::marker::PhantomData;
use std::sync::Mutex;
use super::ChangeStore;
@ -33,6 +34,14 @@ pub struct AppDag {
///
/// - `vv` >= `unparsed_vv`
unparsed_vv: VersionVector,
/// It's a set of points which are deps of some parsed ops.
/// But the ops in this set are not parsed yet. When they are parsed,
/// we need to make sure it breaks at the given point.
unhandled_dep_points: BTreeSet<ID>,
}
pub(crate) struct EnsureDagNodeDepsAreAtTheEnd {
_private: PhantomData<()>,
}
#[derive(Debug, Clone)]
@ -56,6 +65,7 @@ impl AppDag {
frontiers: Frontiers::default(),
vv: VersionVector::default(),
unparsed_vv: VersionVector::default(),
unhandled_dep_points: BTreeSet::new(),
}
}
@ -71,7 +81,89 @@ impl AppDag {
self.vv.is_empty()
}
pub fn insert(&self, id: ID, node: AppDagNode) {
pub(super) fn handle_new_change(&mut self, change: &Change) -> EnsureDagNodeDepsAreAtTheEnd {
let len = change.content_len();
self.update_frontiers(change.id_last(), &change.deps);
if change.deps_on_self() {
// don't need to push new element to dag because it only depends on itself
self.with_last_mut_of_peer(change.id.peer, |last| {
let last = last.unwrap();
assert_eq!(last.peer, change.id.peer, "peer id is not the same");
assert_eq!(
last.cnt + last.len as Counter,
change.id.counter,
"counter is not continuous"
);
assert_eq!(
last.lamport + last.len as Lamport,
change.lamport,
"lamport is not continuous"
);
last.len = (change.id.counter - last.cnt) as usize + len;
last.has_succ = false;
});
} else {
let vv = self.frontiers_to_im_vv(&change.deps);
let mut pushed = false;
let node = AppDagNode {
vv: OnceCell::from(vv),
peer: change.id.peer,
cnt: change.id.counter,
lamport: change.lamport,
deps: change.deps.clone(),
has_succ: false,
len,
};
self.with_last_mut_of_peer(change.id.peer, |last| {
if let Some(last) = last {
if change.id.counter > 0 {
assert_eq!(
last.ctr_end(),
change.id.counter,
"counter is not continuous"
);
}
if last.is_mergable(&node, &()) {
pushed = true;
last.merge(&node, &());
}
}
});
if !pushed {
self.insert(node.id_start(), node);
}
for dep in change.deps.iter() {
let ans = self.with_node_mut(*dep, |target| {
let target = target.unwrap();
if target.ctr_last() == dep.counter {
target.has_succ = true;
None
} else {
// We need to split the target node into two part
// so that we can ensure the new change depends on the
// last id of a dag node.
let new_node =
target.slice(dep.counter as usize - target.cnt as usize, target.len);
target.len -= new_node.len;
Some(new_node)
}
});
if let Some(new_node) = ans {
self.insert(new_node.id_start(), new_node);
}
}
}
EnsureDagNodeDepsAreAtTheEnd {
_private: PhantomData,
}
}
fn insert(&self, id: ID, node: AppDagNode) {
self.map.lock().unwrap().insert(id, node);
}
@ -163,6 +255,7 @@ impl AppDag {
frontiers: self.frontiers.clone(),
vv: self.vv.clone(),
unparsed_vv: self.unparsed_vv.clone(),
unhandled_dep_points: self.unhandled_dep_points.clone(),
}
}

View file

@ -8,7 +8,7 @@
"check-all": "cargo hack check --each-feature",
"build": "cargo build",
"test": "cargo nextest run --features=test_utils && cargo test --doc",
"test-all": "deno task test & deno task test-wasm",
"test-all": "nr test && nr test-wasm",
"test-wasm": "cd crates/loro-wasm && deno task dev && cd ../../loro-js && pnpm i && pnpm run test",
"coverage": "mkdir -p coverage && cargo llvm-cov nextest --features test_utils --lcov > coverage/lcov-nextest.info && cargo llvm-cov report",
"release-wasm": "cd crates/loro-wasm && deno task release && cd ../../loro-js && pnpm i && pnpm build && pnpm run test",