fix: map version checkout err (#101)

This commit is contained in:
Zixuan Chen 2023-08-04 22:41:02 +08:00 committed by GitHub
parent 640828bf26
commit 72cc8c6ed5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 317 additions and 20 deletions

View file

@ -21,7 +21,7 @@
],
"rust-analyzer.runnableEnv": {
"RUST_BACKTRACE": "full",
// "DEBUG": "*"
"DEBUG": "*"
},
"rust-analyzer.cargo.features": ["test_utils"],
"editor.defaultFormatter": "rust-lang.rust-analyzer",

View file

@ -1,10 +1,11 @@
use std::{cmp::Ordering, collections::BinaryHeap};
use debug_log::debug_dbg;
use enum_dispatch::enum_dispatch;
use fxhash::{FxHashMap, FxHashSet};
use loro_common::{ContainerType, HasIdSpan, PeerID, ID};
use crate::{
change::{Change, Lamport},
container::idx::ContainerIdx,
delta::{MapDelta, MapValue},
event::Diff,
@ -65,7 +66,7 @@ impl DiffCalculator {
ContainerDiffCalculator::Text(TextDiffCalculator::default())
}
crate::ContainerType::Map => {
ContainerDiffCalculator::Map(MapDiffCalculator::default())
ContainerDiffCalculator::Map(MapDiffCalculator::new(op.container))
}
crate::ContainerType::List => {
ContainerDiffCalculator::List(ListDiffCalculator::default())
@ -138,8 +139,8 @@ struct TextDiffCalculator {
tracker: Tracker,
}
#[derive(Default)]
struct MapDiffCalculator {
idx: ContainerIdx,
grouped: FxHashMap<InternalString, GroupedValues>,
}
@ -175,6 +176,13 @@ impl GroupedValues {
}
impl MapDiffCalculator {
pub(crate) fn new(idx: ContainerIdx) -> Self {
Self {
idx,
grouped: Default::default(),
}
}
fn checkout(&mut self, vv: &VersionVector) {
for (_, g) in self.grouped.iter_mut() {
g.checkout(vv)
@ -193,7 +201,6 @@ impl DiffCalculatorTrait for MapDiffCalculator {
) {
let map = op.op().content.as_map().unwrap();
let value = oplog.arena.get_value(map.value as usize);
debug_dbg!(&value);
self.grouped
.entry(map.key.clone())
.or_default()
@ -205,7 +212,7 @@ impl DiffCalculatorTrait for MapDiffCalculator {
fn calculate_diff(
&mut self,
_oplog: &super::oplog::OpLog,
oplog: &super::oplog::OpLog,
from: &crate::VersionVector,
to: &crate::VersionVector,
) -> Diff {
@ -233,27 +240,143 @@ impl DiffCalculatorTrait for MapDiffCalculator {
}
let mut updated = FxHashMap::with_capacity_and_hasher(changed.len(), Default::default());
let mut extra_lookup = Vec::new();
for key in changed {
let value = self
if let Some(value) = self
.grouped
.get(&key)
.unwrap()
.applied_or_smaller
.peek()
.cloned()
.unwrap_or_else(|| MapValue {
counter: 0,
value: None,
lamport: (0, 0),
});
{
updated.insert(key, value);
} else {
extra_lookup.push(key);
}
}
updated.insert(key, value);
if !extra_lookup.is_empty() {
// PERF: the first time we do this, it may take a long time:
// it will travel the whole history with O(n) time
let ans = oplog.lookup_map_values_at(self.idx, &extra_lookup, to);
for (k, v) in extra_lookup.into_iter().zip(ans.into_iter()) {
updated.insert(
k,
v.unwrap_or_else(|| MapValue {
counter: 0,
value: None,
lamport: (0, 0),
}),
);
}
}
Diff::NewMap(MapDelta { updated })
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
struct CompactMapValue {
lamport: Lamport,
peer: PeerID,
counter: Counter,
}
impl HasId for CompactMapValue {
fn id_start(&self) -> ID {
ID::new(self.peer, self.counter)
}
}
#[derive(Debug, Default)]
struct CompactGroupedValues {
/// Each value in this set should be included in the current version or
/// "concurrent to the current version it is not at the peak".
applied_or_smaller: BinaryHeap<CompactMapValue>,
/// The values that are guaranteed not in the current version. (they are from the future)
pending: Vec<CompactMapValue>,
}
impl CompactGroupedValues {
fn checkout(&mut self, vv: &VersionVector) {
self.pending.retain(|v| {
if vv.includes_id(v.id_start()) {
self.applied_or_smaller.push(*v);
false
} else {
true
}
});
while let Some(top) = self.applied_or_smaller.peek() {
if vv.includes_id(top.id_start()) {
break;
} else {
let top = self.applied_or_smaller.pop().unwrap();
self.pending.push(top);
}
}
}
fn peek(&self) -> Option<CompactMapValue> {
self.applied_or_smaller.peek().cloned()
}
}
#[derive(Default)]
pub(crate) struct GlobalMapDiffCalculator {
maps: FxHashMap<ContainerIdx, FxHashMap<InternalString, CompactGroupedValues>>,
pub(crate) last_vv: VersionVector,
}
impl GlobalMapDiffCalculator {
pub fn process_change(&mut self, change: &Change) {
if self.last_vv.includes_id(change.id_last()) {
return;
}
for op in change.ops.iter() {
if op.container.get_type() == ContainerType::Map {
let key = op.content.as_map().unwrap().key.clone();
self.maps
.entry(op.container)
.or_default()
.entry(key)
.or_default()
.pending
.push(CompactMapValue {
lamport: (op.counter - change.id.counter) as Lamport + change.lamport,
peer: change.id.peer,
counter: op.counter,
});
}
}
self.last_vv.extend_to_include_end_id(change.id_end());
}
pub fn get_value_at(
&mut self,
container: ContainerIdx,
key: &InternalString,
vv: &VersionVector,
oplog: &OpLog,
) -> Option<MapValue> {
let group = self.maps.get_mut(&container)?.get_mut(key)?;
group.checkout(vv);
let peek = group.peek()?;
let op = oplog.lookup_op(peek.id_start()).unwrap();
let value_idx = op.content.as_map().unwrap().value;
let value = oplog.arena.get_value(value_idx as usize);
Some(MapValue {
counter: peek.counter,
value,
lamport: (peek.lamport, peek.peer),
})
}
}
#[derive(Default)]
struct ListDiffCalculator {
tracker: Tracker,

View file

@ -5,6 +5,7 @@ use crate::{
list::list_op::{DeleteSpan, ListOp},
text::text_content::ListSlice,
},
delta::MapValue,
txn::EventHint,
};
use loro_common::{ContainerID, ContainerType, LoroResult, LoroValue};
@ -13,16 +14,19 @@ use std::{
sync::{Mutex, Weak},
};
#[derive(Clone)]
pub struct TextHandler {
container_idx: ContainerIdx,
state: Weak<Mutex<DocState>>,
}
#[derive(Clone)]
pub struct MapHandler {
container_idx: ContainerIdx,
state: Weak<Mutex<DocState>>,
}
#[derive(Clone)]
pub struct ListHandler {
container_idx: ContainerIdx,
state: Weak<Mutex<DocState>>,
@ -394,6 +398,23 @@ impl ListHandler {
a.get(index).cloned()
})
}
pub fn for_each<I>(&self, f: I)
where
I: Fn(&LoroValue),
{
self.state
.upgrade()
.unwrap()
.lock()
.unwrap()
.with_state(self.container_idx, |state| {
let a = state.as_list_state().unwrap();
for v in a.iter() {
f(v);
}
})
}
}
impl MapHandler {
@ -457,6 +478,23 @@ impl MapHandler {
)
}
pub fn for_each<I>(&self, f: I)
where
I: Fn(&str, &MapValue),
{
self.state
.upgrade()
.unwrap()
.lock()
.unwrap()
.with_state(self.container_idx, |state| {
let a = state.as_map_state().unwrap();
for (k, v) in a.iter() {
f(k, v);
}
})
}
pub fn get_value(&self) -> LoroValue {
self.state
.upgrade()

View file

@ -115,6 +115,16 @@ impl LoroDoc {
self.txn_with_origin("")
}
#[inline(always)]
pub fn with_txn<F>(&self, f: F) -> LoroResult<()>
where
F: Fn(&mut Transaction),
{
let mut txn = self.txn().unwrap();
f(&mut txn);
txn.commit()
}
/// Create a new transaction with specified origin.
///
/// The origin will be propagated to the events.

View file

@ -4,21 +4,25 @@ use std::borrow::Cow;
use std::cell::RefCell;
use std::cmp::Ordering;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use fxhash::FxHashMap;
use rle::{HasLength, RleVec};
// use tabled::measurment::Percent;
use crate::change::{Change, Lamport, Timestamp};
use crate::container::idx::ContainerIdx;
use crate::container::list::list_op;
use crate::dag::DagUtils;
use crate::delta::MapValue;
use crate::diff_calc::GlobalMapDiffCalculator;
use crate::encoding::{decode_oplog, encode_oplog, EncodeMode};
use crate::encoding::{ClientChanges, RemoteClientChanges};
use crate::id::{Counter, PeerID, ID};
use crate::op::{RawOpContent, RemoteOp};
use crate::span::{HasCounterSpan, HasIdSpan, HasLamportSpan};
use crate::version::{Frontiers, ImVersionVector, VersionVector};
use crate::LoroError;
use crate::{InternalString, LoroError};
use super::arena::SharedArena;
@ -40,6 +44,7 @@ pub struct OpLog {
/// A change can be imported only when all its deps are already imported.
/// Key is the ID of the missing dep
pending_changes: FxHashMap<ID, Vec<Change>>,
map_diff_calc: Arc<Mutex<GlobalMapDiffCalculator>>,
}
/// [AppDag] maintains the causal graph of the app.
@ -70,6 +75,7 @@ impl Clone for OpLog {
next_lamport: self.next_lamport,
latest_timestamp: self.latest_timestamp,
pending_changes: Default::default(),
map_diff_calc: Default::default(),
}
}
}
@ -94,6 +100,7 @@ impl OpLog {
next_lamport: 0,
latest_timestamp: Timestamp::default(),
pending_changes: Default::default(),
map_diff_calc: Default::default(),
}
}
@ -101,10 +108,8 @@ impl OpLog {
Self {
dag: AppDag::default(),
arena,
changes: ClientChanges::default(),
next_lamport: 0,
latest_timestamp: Timestamp::default(),
pending_changes: Default::default(),
..Default::default()
}
}
@ -417,6 +422,11 @@ impl OpLog {
})
}
pub(crate) fn lookup_op(&self, id: ID) -> Option<&crate::op::Op> {
self.lookup_change(id)
.and_then(|change| change.ops.get_by_atom_index(id.counter).map(|x| x.element))
}
pub fn export_from(&self, vv: &VersionVector) -> Vec<u8> {
encode_oplog(self, EncodeMode::Auto(vv.clone()))
}
@ -425,6 +435,27 @@ impl OpLog {
decode_oplog(self, data)
}
/// Iterates over all changes between `from` and `to` peer by peer
pub(crate) fn for_each_change_within(
&self,
from: &VersionVector,
to: &VersionVector,
mut f: impl FnMut(&Change),
) {
for (peer, changes) in self.changes.iter() {
let from_cnt = from.get(peer).copied().unwrap_or(0);
let to_cnt = to.get(peer).copied().unwrap_or(0);
if from_cnt == to_cnt {
continue;
}
let Some(result) = changes.get_by_atom_index(from_cnt) else { continue };
for i in result.merged_index..changes.vec().len() {
f(&changes.vec()[i])
}
}
}
/// iterates over all changes between LCA(common ancestors) to the merged version of (`from` and `to`) causally
///
/// Tht iterator will include a version vector when the change is applied
@ -533,6 +564,28 @@ impl OpLog {
println!("total atom ops: {}", total_atom_ops);
println!("total dag node: {}", total_dag_node);
}
/// lookup map values at a specific version
// PERF: this is slow. it needs to traverse all changes to build the cache for now
pub(crate) fn lookup_map_values_at(
&self,
idx: ContainerIdx,
extra_lookup: &[InternalString],
to: &VersionVector,
) -> Vec<Option<MapValue>> {
let mut map_diff_calc = self.map_diff_calc.lock().unwrap();
if to.partial_cmp(&map_diff_calc.last_vv) != Some(Ordering::Less) {
let from = map_diff_calc.last_vv.clone();
self.for_each_change_within(&from, to, |change| map_diff_calc.process_change(change));
}
let ans = extra_lookup
.iter()
.map(|x| map_diff_calc.get_value_at(idx, x, to, self))
.collect();
ans
}
}
impl Default for OpLog {

View file

@ -244,7 +244,6 @@ impl DocState {
if self.is_recording() {
self.record_diff(diff)
}
debug_dbg!(self.get_deep_value());
}
pub fn apply_local_op(&mut self, op: RawOp) -> LoroResult<()> {

View file

@ -138,7 +138,13 @@ impl MapState {
}
}
pub fn iter(&self) -> impl Iterator<Item = (&InternalString, &MapValue)> {
pub fn iter(
&self,
) -> std::collections::hash_map::Iter<
'_,
string_cache::Atom<string_cache::EmptyStaticAtomSet>,
MapValue,
> {
self.map.iter()
}

View file

@ -1,5 +1,5 @@
use loro_common::ID;
use loro_internal::{version::Frontiers, LoroDoc};
use loro_internal::{version::Frontiers, LoroDoc, ToJson};
#[test]
fn test_timestamp() {
@ -45,3 +45,71 @@ fn test_checkout() {
assert_eq!(text.len_utf8(), 12);
assert_eq!(text.len_unicode(), 4);
}
#[test]
fn map_checkout() {
let mut doc = LoroDoc::new();
let meta = doc.get_map("meta");
let v_empty = doc.oplog_frontiers();
doc.with_txn(|txn| {
meta.insert(txn, "key", 0.into()).unwrap();
})
.unwrap();
let v0 = doc.oplog_frontiers();
doc.with_txn(|txn| {
meta.insert(txn, "key", 1.into()).unwrap();
})
.unwrap();
let v1 = doc.oplog_frontiers();
assert_eq!(meta.get_deep_value().to_json(), r#"{"key":1}"#);
doc.checkout(&v0);
assert_eq!(meta.get_deep_value().to_json(), r#"{"key":0}"#);
doc.checkout(&v_empty);
assert_eq!(meta.get_deep_value().to_json(), r#"{}"#);
doc.checkout(&v1);
assert_eq!(meta.get_deep_value().to_json(), r#"{"key":1}"#);
}
#[test]
fn map_concurrent_checkout() {
let mut doc_a = LoroDoc::new();
let meta_a = doc_a.get_map("meta");
let doc_b = LoroDoc::new();
let meta_b = doc_b.get_map("meta");
doc_a
.with_txn(|txn| {
meta_a.insert(txn, "key", 0.into()).unwrap();
})
.unwrap();
let va = doc_a.oplog_frontiers();
doc_b
.with_txn(|txn| {
meta_b.insert(txn, "s", 1.into()).unwrap();
})
.unwrap();
let vb_0 = doc_b.oplog_frontiers();
doc_b
.with_txn(|txn| {
meta_b.insert(txn, "key", 1.into()).unwrap();
})
.unwrap();
let vb_1 = doc_b.oplog_frontiers();
doc_a.import(&doc_b.export_snapshot()).unwrap();
doc_a
.with_txn(|txn| {
meta_a.insert(txn, "key", 2.into()).unwrap();
})
.unwrap();
let v_merged = doc_a.oplog_frontiers();
doc_a.checkout(&va);
assert_eq!(meta_a.get_deep_value().to_json(), r#"{"key":0}"#);
doc_a.checkout(&vb_0);
assert_eq!(meta_a.get_deep_value().to_json(), r#"{"s":1}"#);
doc_a.checkout(&vb_1);
assert_eq!(meta_a.get_deep_value().to_json(), r#"{"s":1,"key":1}"#);
doc_a.checkout(&v_merged);
assert_eq!(meta_a.get_deep_value().to_json(), r#"{"s":1,"key":2}"#);
}