From 72cc8c6ed5bf9791dcf622d32dc87f826f0ebd60 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Fri, 4 Aug 2023 22:41:02 +0800 Subject: [PATCH] fix: map version checkout err (#101) --- .vscode/settings.json | 2 +- crates/loro-internal/src/diff_calc.rs | 147 ++++++++++++++++++-- crates/loro-internal/src/handler.rs | 38 +++++ crates/loro-internal/src/loro.rs | 10 ++ crates/loro-internal/src/oplog.rs | 61 +++++++- crates/loro-internal/src/state.rs | 1 - crates/loro-internal/src/state/map_state.rs | 8 +- crates/loro-internal/tests/test.rs | 70 +++++++++- 8 files changed, 317 insertions(+), 20 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 9b2d864d..fb923ab2 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -21,7 +21,7 @@ ], "rust-analyzer.runnableEnv": { "RUST_BACKTRACE": "full", - // "DEBUG": "*" + "DEBUG": "*" }, "rust-analyzer.cargo.features": ["test_utils"], "editor.defaultFormatter": "rust-lang.rust-analyzer", diff --git a/crates/loro-internal/src/diff_calc.rs b/crates/loro-internal/src/diff_calc.rs index 68efc65a..af207add 100644 --- a/crates/loro-internal/src/diff_calc.rs +++ b/crates/loro-internal/src/diff_calc.rs @@ -1,10 +1,11 @@ use std::{cmp::Ordering, collections::BinaryHeap}; -use debug_log::debug_dbg; use enum_dispatch::enum_dispatch; use fxhash::{FxHashMap, FxHashSet}; +use loro_common::{ContainerType, HasIdSpan, PeerID, ID}; use crate::{ + change::{Change, Lamport}, container::idx::ContainerIdx, delta::{MapDelta, MapValue}, event::Diff, @@ -65,7 +66,7 @@ impl DiffCalculator { ContainerDiffCalculator::Text(TextDiffCalculator::default()) } crate::ContainerType::Map => { - ContainerDiffCalculator::Map(MapDiffCalculator::default()) + ContainerDiffCalculator::Map(MapDiffCalculator::new(op.container)) } crate::ContainerType::List => { ContainerDiffCalculator::List(ListDiffCalculator::default()) @@ -138,8 +139,8 @@ struct TextDiffCalculator { tracker: Tracker, } -#[derive(Default)] struct MapDiffCalculator { + idx: ContainerIdx, grouped: FxHashMap, } @@ -175,6 +176,13 @@ impl GroupedValues { } impl MapDiffCalculator { + pub(crate) fn new(idx: ContainerIdx) -> Self { + Self { + idx, + grouped: Default::default(), + } + } + fn checkout(&mut self, vv: &VersionVector) { for (_, g) in self.grouped.iter_mut() { g.checkout(vv) @@ -193,7 +201,6 @@ impl DiffCalculatorTrait for MapDiffCalculator { ) { let map = op.op().content.as_map().unwrap(); let value = oplog.arena.get_value(map.value as usize); - debug_dbg!(&value); self.grouped .entry(map.key.clone()) .or_default() @@ -205,7 +212,7 @@ impl DiffCalculatorTrait for MapDiffCalculator { fn calculate_diff( &mut self, - _oplog: &super::oplog::OpLog, + oplog: &super::oplog::OpLog, from: &crate::VersionVector, to: &crate::VersionVector, ) -> Diff { @@ -233,27 +240,143 @@ impl DiffCalculatorTrait for MapDiffCalculator { } let mut updated = FxHashMap::with_capacity_and_hasher(changed.len(), Default::default()); + let mut extra_lookup = Vec::new(); for key in changed { - let value = self + if let Some(value) = self .grouped .get(&key) .unwrap() .applied_or_smaller .peek() .cloned() - .unwrap_or_else(|| MapValue { - counter: 0, - value: None, - lamport: (0, 0), - }); + { + updated.insert(key, value); + } else { + extra_lookup.push(key); + } + } - updated.insert(key, value); + if !extra_lookup.is_empty() { + // PERF: the first time we do this, it may take a long time: + // it will travel the whole history with O(n) time + let ans = oplog.lookup_map_values_at(self.idx, &extra_lookup, to); + for (k, v) in extra_lookup.into_iter().zip(ans.into_iter()) { + updated.insert( + k, + v.unwrap_or_else(|| MapValue { + counter: 0, + value: None, + lamport: (0, 0), + }), + ); + } } Diff::NewMap(MapDelta { updated }) } } +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +struct CompactMapValue { + lamport: Lamport, + peer: PeerID, + counter: Counter, +} + +impl HasId for CompactMapValue { + fn id_start(&self) -> ID { + ID::new(self.peer, self.counter) + } +} + +#[derive(Debug, Default)] +struct CompactGroupedValues { + /// Each value in this set should be included in the current version or + /// "concurrent to the current version it is not at the peak". + applied_or_smaller: BinaryHeap, + /// The values that are guaranteed not in the current version. (they are from the future) + pending: Vec, +} + +impl CompactGroupedValues { + fn checkout(&mut self, vv: &VersionVector) { + self.pending.retain(|v| { + if vv.includes_id(v.id_start()) { + self.applied_or_smaller.push(*v); + false + } else { + true + } + }); + + while let Some(top) = self.applied_or_smaller.peek() { + if vv.includes_id(top.id_start()) { + break; + } else { + let top = self.applied_or_smaller.pop().unwrap(); + self.pending.push(top); + } + } + } + + fn peek(&self) -> Option { + self.applied_or_smaller.peek().cloned() + } +} + +#[derive(Default)] +pub(crate) struct GlobalMapDiffCalculator { + maps: FxHashMap>, + pub(crate) last_vv: VersionVector, +} + +impl GlobalMapDiffCalculator { + pub fn process_change(&mut self, change: &Change) { + if self.last_vv.includes_id(change.id_last()) { + return; + } + + for op in change.ops.iter() { + if op.container.get_type() == ContainerType::Map { + let key = op.content.as_map().unwrap().key.clone(); + self.maps + .entry(op.container) + .or_default() + .entry(key) + .or_default() + .pending + .push(CompactMapValue { + lamport: (op.counter - change.id.counter) as Lamport + change.lamport, + peer: change.id.peer, + counter: op.counter, + }); + } + } + + self.last_vv.extend_to_include_end_id(change.id_end()); + } + + pub fn get_value_at( + &mut self, + container: ContainerIdx, + key: &InternalString, + vv: &VersionVector, + oplog: &OpLog, + ) -> Option { + let group = self.maps.get_mut(&container)?.get_mut(key)?; + group.checkout(vv); + let peek = group.peek()?; + let op = oplog.lookup_op(peek.id_start()).unwrap(); + let value_idx = op.content.as_map().unwrap().value; + let value = oplog.arena.get_value(value_idx as usize); + Some(MapValue { + counter: peek.counter, + value, + lamport: (peek.lamport, peek.peer), + }) + } +} + #[derive(Default)] struct ListDiffCalculator { tracker: Tracker, diff --git a/crates/loro-internal/src/handler.rs b/crates/loro-internal/src/handler.rs index ddbad3aa..adef7ab5 100644 --- a/crates/loro-internal/src/handler.rs +++ b/crates/loro-internal/src/handler.rs @@ -5,6 +5,7 @@ use crate::{ list::list_op::{DeleteSpan, ListOp}, text::text_content::ListSlice, }, + delta::MapValue, txn::EventHint, }; use loro_common::{ContainerID, ContainerType, LoroResult, LoroValue}; @@ -13,16 +14,19 @@ use std::{ sync::{Mutex, Weak}, }; +#[derive(Clone)] pub struct TextHandler { container_idx: ContainerIdx, state: Weak>, } +#[derive(Clone)] pub struct MapHandler { container_idx: ContainerIdx, state: Weak>, } +#[derive(Clone)] pub struct ListHandler { container_idx: ContainerIdx, state: Weak>, @@ -394,6 +398,23 @@ impl ListHandler { a.get(index).cloned() }) } + + pub fn for_each(&self, f: I) + where + I: Fn(&LoroValue), + { + self.state + .upgrade() + .unwrap() + .lock() + .unwrap() + .with_state(self.container_idx, |state| { + let a = state.as_list_state().unwrap(); + for v in a.iter() { + f(v); + } + }) + } } impl MapHandler { @@ -457,6 +478,23 @@ impl MapHandler { ) } + pub fn for_each(&self, f: I) + where + I: Fn(&str, &MapValue), + { + self.state + .upgrade() + .unwrap() + .lock() + .unwrap() + .with_state(self.container_idx, |state| { + let a = state.as_map_state().unwrap(); + for (k, v) in a.iter() { + f(k, v); + } + }) + } + pub fn get_value(&self) -> LoroValue { self.state .upgrade() diff --git a/crates/loro-internal/src/loro.rs b/crates/loro-internal/src/loro.rs index f4c43211..bf25277d 100644 --- a/crates/loro-internal/src/loro.rs +++ b/crates/loro-internal/src/loro.rs @@ -115,6 +115,16 @@ impl LoroDoc { self.txn_with_origin("") } + #[inline(always)] + pub fn with_txn(&self, f: F) -> LoroResult<()> + where + F: Fn(&mut Transaction), + { + let mut txn = self.txn().unwrap(); + f(&mut txn); + txn.commit() + } + /// Create a new transaction with specified origin. /// /// The origin will be propagated to the events. diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index 2f3eb964..99cdd163 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -4,21 +4,25 @@ use std::borrow::Cow; use std::cell::RefCell; use std::cmp::Ordering; use std::rc::Rc; +use std::sync::{Arc, Mutex}; use fxhash::FxHashMap; use rle::{HasLength, RleVec}; // use tabled::measurment::Percent; use crate::change::{Change, Lamport, Timestamp}; +use crate::container::idx::ContainerIdx; use crate::container::list::list_op; use crate::dag::DagUtils; +use crate::delta::MapValue; +use crate::diff_calc::GlobalMapDiffCalculator; use crate::encoding::{decode_oplog, encode_oplog, EncodeMode}; use crate::encoding::{ClientChanges, RemoteClientChanges}; use crate::id::{Counter, PeerID, ID}; use crate::op::{RawOpContent, RemoteOp}; use crate::span::{HasCounterSpan, HasIdSpan, HasLamportSpan}; use crate::version::{Frontiers, ImVersionVector, VersionVector}; -use crate::LoroError; +use crate::{InternalString, LoroError}; use super::arena::SharedArena; @@ -40,6 +44,7 @@ pub struct OpLog { /// A change can be imported only when all its deps are already imported. /// Key is the ID of the missing dep pending_changes: FxHashMap>, + map_diff_calc: Arc>, } /// [AppDag] maintains the causal graph of the app. @@ -70,6 +75,7 @@ impl Clone for OpLog { next_lamport: self.next_lamport, latest_timestamp: self.latest_timestamp, pending_changes: Default::default(), + map_diff_calc: Default::default(), } } } @@ -94,6 +100,7 @@ impl OpLog { next_lamport: 0, latest_timestamp: Timestamp::default(), pending_changes: Default::default(), + map_diff_calc: Default::default(), } } @@ -101,10 +108,8 @@ impl OpLog { Self { dag: AppDag::default(), arena, - changes: ClientChanges::default(), next_lamport: 0, - latest_timestamp: Timestamp::default(), - pending_changes: Default::default(), + ..Default::default() } } @@ -417,6 +422,11 @@ impl OpLog { }) } + pub(crate) fn lookup_op(&self, id: ID) -> Option<&crate::op::Op> { + self.lookup_change(id) + .and_then(|change| change.ops.get_by_atom_index(id.counter).map(|x| x.element)) + } + pub fn export_from(&self, vv: &VersionVector) -> Vec { encode_oplog(self, EncodeMode::Auto(vv.clone())) } @@ -425,6 +435,27 @@ impl OpLog { decode_oplog(self, data) } + /// Iterates over all changes between `from` and `to` peer by peer + pub(crate) fn for_each_change_within( + &self, + from: &VersionVector, + to: &VersionVector, + mut f: impl FnMut(&Change), + ) { + for (peer, changes) in self.changes.iter() { + let from_cnt = from.get(peer).copied().unwrap_or(0); + let to_cnt = to.get(peer).copied().unwrap_or(0); + if from_cnt == to_cnt { + continue; + } + + let Some(result) = changes.get_by_atom_index(from_cnt) else { continue }; + for i in result.merged_index..changes.vec().len() { + f(&changes.vec()[i]) + } + } + } + /// iterates over all changes between LCA(common ancestors) to the merged version of (`from` and `to`) causally /// /// Tht iterator will include a version vector when the change is applied @@ -533,6 +564,28 @@ impl OpLog { println!("total atom ops: {}", total_atom_ops); println!("total dag node: {}", total_dag_node); } + + /// lookup map values at a specific version + // PERF: this is slow. it needs to traverse all changes to build the cache for now + pub(crate) fn lookup_map_values_at( + &self, + idx: ContainerIdx, + extra_lookup: &[InternalString], + to: &VersionVector, + ) -> Vec> { + let mut map_diff_calc = self.map_diff_calc.lock().unwrap(); + if to.partial_cmp(&map_diff_calc.last_vv) != Some(Ordering::Less) { + let from = map_diff_calc.last_vv.clone(); + self.for_each_change_within(&from, to, |change| map_diff_calc.process_change(change)); + } + + let ans = extra_lookup + .iter() + .map(|x| map_diff_calc.get_value_at(idx, x, to, self)) + .collect(); + + ans + } } impl Default for OpLog { diff --git a/crates/loro-internal/src/state.rs b/crates/loro-internal/src/state.rs index 9cc30847..d616d9c5 100644 --- a/crates/loro-internal/src/state.rs +++ b/crates/loro-internal/src/state.rs @@ -244,7 +244,6 @@ impl DocState { if self.is_recording() { self.record_diff(diff) } - debug_dbg!(self.get_deep_value()); } pub fn apply_local_op(&mut self, op: RawOp) -> LoroResult<()> { diff --git a/crates/loro-internal/src/state/map_state.rs b/crates/loro-internal/src/state/map_state.rs index 89e0177d..70a801b4 100644 --- a/crates/loro-internal/src/state/map_state.rs +++ b/crates/loro-internal/src/state/map_state.rs @@ -138,7 +138,13 @@ impl MapState { } } - pub fn iter(&self) -> impl Iterator { + pub fn iter( + &self, + ) -> std::collections::hash_map::Iter< + '_, + string_cache::Atom, + MapValue, + > { self.map.iter() } diff --git a/crates/loro-internal/tests/test.rs b/crates/loro-internal/tests/test.rs index db3d24a5..7c063b9e 100644 --- a/crates/loro-internal/tests/test.rs +++ b/crates/loro-internal/tests/test.rs @@ -1,5 +1,5 @@ use loro_common::ID; -use loro_internal::{version::Frontiers, LoroDoc}; +use loro_internal::{version::Frontiers, LoroDoc, ToJson}; #[test] fn test_timestamp() { @@ -45,3 +45,71 @@ fn test_checkout() { assert_eq!(text.len_utf8(), 12); assert_eq!(text.len_unicode(), 4); } + +#[test] +fn map_checkout() { + let mut doc = LoroDoc::new(); + let meta = doc.get_map("meta"); + let v_empty = doc.oplog_frontiers(); + doc.with_txn(|txn| { + meta.insert(txn, "key", 0.into()).unwrap(); + }) + .unwrap(); + let v0 = doc.oplog_frontiers(); + doc.with_txn(|txn| { + meta.insert(txn, "key", 1.into()).unwrap(); + }) + .unwrap(); + let v1 = doc.oplog_frontiers(); + assert_eq!(meta.get_deep_value().to_json(), r#"{"key":1}"#); + doc.checkout(&v0); + assert_eq!(meta.get_deep_value().to_json(), r#"{"key":0}"#); + doc.checkout(&v_empty); + assert_eq!(meta.get_deep_value().to_json(), r#"{}"#); + doc.checkout(&v1); + assert_eq!(meta.get_deep_value().to_json(), r#"{"key":1}"#); +} + +#[test] +fn map_concurrent_checkout() { + let mut doc_a = LoroDoc::new(); + let meta_a = doc_a.get_map("meta"); + let doc_b = LoroDoc::new(); + let meta_b = doc_b.get_map("meta"); + + doc_a + .with_txn(|txn| { + meta_a.insert(txn, "key", 0.into()).unwrap(); + }) + .unwrap(); + let va = doc_a.oplog_frontiers(); + doc_b + .with_txn(|txn| { + meta_b.insert(txn, "s", 1.into()).unwrap(); + }) + .unwrap(); + let vb_0 = doc_b.oplog_frontiers(); + doc_b + .with_txn(|txn| { + meta_b.insert(txn, "key", 1.into()).unwrap(); + }) + .unwrap(); + let vb_1 = doc_b.oplog_frontiers(); + doc_a.import(&doc_b.export_snapshot()).unwrap(); + doc_a + .with_txn(|txn| { + meta_a.insert(txn, "key", 2.into()).unwrap(); + }) + .unwrap(); + + let v_merged = doc_a.oplog_frontiers(); + + doc_a.checkout(&va); + assert_eq!(meta_a.get_deep_value().to_json(), r#"{"key":0}"#); + doc_a.checkout(&vb_0); + assert_eq!(meta_a.get_deep_value().to_json(), r#"{"s":1}"#); + doc_a.checkout(&vb_1); + assert_eq!(meta_a.get_deep_value().to_json(), r#"{"s":1,"key":1}"#); + doc_a.checkout(&v_merged); + assert_eq!(meta_a.get_deep_value().to_json(), r#"{"s":1,"key":2}"#); +}