fix: remove changes error freeze behavior

and cover more cases that can be applied directly
This commit is contained in:
Zixuan Chen 2022-11-09 13:53:12 +08:00
parent 9733f24855
commit 58fb7de26c
8 changed files with 90 additions and 56 deletions

View file

@ -44,5 +44,7 @@ fn main() {
let b = text.text_len();
assert_eq!(a, b);
}
loro_b.debug_inspect();
loro.debug_inspect();
println!("Elapsed {}ms", start.elapsed().as_millis());
}

View file

@ -48,7 +48,6 @@ impl<O> Change<O> {
id: ID,
lamport: Lamport,
timestamp: Timestamp,
_freezed: bool,
) -> Self {
Change {
ops,
@ -83,13 +82,11 @@ impl<O: Mergable + HasLength + HasIndex> HasLength for Change<O> {
pub struct ChangeMergeCfg {
pub max_change_length: usize,
pub max_change_interval: usize,
pub from_this_client: bool,
}
impl ChangeMergeCfg {
pub fn new(from_this: bool) -> Self {
pub fn new() -> Self {
ChangeMergeCfg {
from_this_client: from_this,
max_change_length: 1024,
max_change_interval: 60,
}
@ -101,7 +98,6 @@ impl Default for ChangeMergeCfg {
Self {
max_change_length: 1024,
max_change_interval: 60,
from_this_client: false,
}
}
}
@ -112,10 +108,6 @@ impl Mergable<ChangeMergeCfg> for Change {
}
fn is_mergable(&self, other: &Self, cfg: &ChangeMergeCfg) -> bool {
if !cfg.from_this_client {
return false;
}
if other.deps.is_empty() || !(other.deps.len() == 1 && self.id_last() == other.deps[0]) {
return false;
}

View file

@ -152,38 +152,70 @@ impl Container for TextContainer {
// TODO: may reduce following two into one op
let common_ancestors = store.find_common_ancestor(&[new_op_id], &self.head);
if common_ancestors == self.head {
let latest_head: SmallVec<[ID; 2]> = smallvec![new_op_id];
let latest_head = smallvec![new_op_id];
let path = store.find_path(&self.head, &latest_head);
if path.right.len() == 1 {
for iter in store.iter_partial(&self.head, path.right) {
let change = iter
.data
.slice(iter.slice.start as usize, iter.slice.end as usize);
assert!(iter.retreat.is_empty());
assert!(iter.forward.is_empty());
for op in change.ops.iter() {
if op.container == self_idx {
match &op.content {
OpContent::Normal {
content: InsertContent::List(op),
} => match op {
ListOp::Insert { slice, pos } => {
self.state.insert(*pos, slice.as_slice().unwrap().clone())
}
ListOp::Delete(span) => self.state.delete_range(
Some(span.start() as usize),
Some(span.end() as usize),
),
},
_ => unreachable!(),
// linear updates, we can apply them directly
let start = self.vv.get(&new_op_id.client_id).copied().unwrap_or(0);
for op in store.iter_ops_at_id_span(
IdSpan::new(new_op_id.client_id, start, new_op_id.counter + 1),
self.id.clone(),
) {
let op = op.get_sliced();
match &op.content {
OpContent::Normal {
content: InsertContent::List(op),
} => match op {
ListOp::Insert { slice, pos } => {
self.state.insert(*pos, slice.as_slice().unwrap().clone())
}
}
ListOp::Delete(span) => self.state.delete_range(
Some(span.start() as usize),
Some(span.end() as usize),
),
},
_ => unreachable!(),
}
}
self.head = smallvec![new_op_id];
self.head = latest_head;
self.vv.set_last(new_op_id);
return;
} else {
let path: Vec<_> = store.iter_partial(&self.head, path.right).collect();
if path
.iter()
.all(|x| x.forward.is_empty() && x.retreat.is_empty())
{
// if we don't need to retreat or forward, we can update the state directly
for iter in path {
let change = iter
.data
.slice(iter.slice.start as usize, iter.slice.end as usize);
for op in change.ops.iter() {
if op.container == self_idx {
match &op.content {
OpContent::Normal {
content: InsertContent::List(op),
} => match op {
ListOp::Insert { slice, pos } => self
.state
.insert(*pos, slice.as_slice().unwrap().clone()),
ListOp::Delete(span) => self.state.delete_range(
Some(span.start() as usize),
Some(span.end() as usize),
),
},
_ => unreachable!(),
}
}
}
}
self.head = latest_head;
self.vv.set_last(new_op_id);
return;
}
}
}
@ -214,7 +246,6 @@ impl Container for TextContainer {
};
// stage 1
// TODO: need a better mechanism to track the head (KEEP IT IN TRACKER?)
let path = store.find_path(&head, &latest_head);
debug_log!("path={:?}", &path.right);
for iter in store.iter_partial(&head, path.right) {

View file

@ -277,7 +277,7 @@ impl LogStore {
self.vv.set_end(change.id_end());
self.changes
.entry(self.this_client_id)
.or_insert_with(|| RleVecWithIndex::new_with_conf(ChangeMergeCfg::new(true)))
.or_insert_with(|| RleVecWithIndex::new_with_conf(ChangeMergeCfg::new()))
.push(change);
debug_log!("CHANGES---------------- site {}", self.this_client_id);

View file

@ -5,10 +5,12 @@ use crate::change::Lamport;
use crate::id::ClientID;
use crate::id::ContainerIdx;
use crate::op::RichOp;
use crate::span::HasCounter;
use crate::span::HasId;
use crate::span::IdSpan;
use fxhash::FxHashMap;
use rle::HasLength;
use crate::change::ChangeMergeCfg;
@ -68,7 +70,11 @@ impl<'a> OpSpanIter<'a> {
container,
changes,
change_index,
op_index: 0,
op_index: rle_changes[change_index]
.ops
.get(target_span.counter.start)
.unwrap()
.merged_index,
}
}
}
@ -85,13 +91,22 @@ impl<'a> Iterator for OpSpanIter<'a> {
let change = &self.changes[self.change_index];
let ops = change.ops.vec();
if let Some(op) = ops.get(self.op_index) {
if op.counter >= self.span.counter.end {
return None;
}
self.op_index += 1;
if op.container != self.container {
self.op_index += 1;
continue;
}
let start = (self.span.counter.min() - op.counter).max(0) as usize;
let end = ((self.span.counter.end() - op.counter) as usize).min(op.atom_len());
assert!(start < end, "{:?} {:#?}", self.span, op);
return Some(RichOp {
op,
start,
end,
lamport: (op.counter - change.id.counter) as Lamport + change.lamport,
timestamp: change.timestamp,
});

View file

@ -198,6 +198,14 @@ pub struct RichOp<'a> {
pub op: &'a Op,
pub lamport: Lamport,
pub timestamp: Timestamp,
pub start: usize,
pub end: usize,
}
impl<'a> RichOp<'a> {
pub fn get_sliced(&self) -> Op {
self.op.slice(self.start, self.end)
}
}
impl HasIndex for Op {

View file

@ -28,7 +28,7 @@ use crate::{
/// see also [im].
#[repr(transparent)]
#[derive(Debug, Clone)]
pub struct VersionVector(ImHashMap<ClientID, Counter>);
pub struct VersionVector(FxHashMap<ClientID, Counter>);
impl PartialEq for VersionVector {
fn eq(&self, other: &Self) -> bool {
@ -43,7 +43,7 @@ impl PartialEq for VersionVector {
impl Eq for VersionVector {}
impl Deref for VersionVector {
type Target = ImHashMap<ClientID, Counter>;
type Target = FxHashMap<ClientID, Counter>;
fn deref(&self) -> &Self::Target {
&self.0
@ -250,7 +250,7 @@ impl VersionVector {
#[inline]
pub fn new() -> Self {
Self(ImHashMap::new())
Self(Default::default())
}
/// set the inclusive ending point. target id will be included by self
@ -377,7 +377,7 @@ impl Default for VersionVector {
impl From<FxHashMap<ClientID, Counter>> for VersionVector {
fn from(map: FxHashMap<ClientID, Counter>) -> Self {
let mut im_map = ImHashMap::new();
let mut im_map = FxHashMap::default();
for (client_id, counter) in map {
im_map.insert(client_id, counter);
}

View file

@ -86,21 +86,7 @@ impl<T: Mergable<Cfg> + HasLength, Cfg> RleVecWithIndex<T, Cfg> {
return None;
}
let mut start = 0;
let mut end = self.index.len() - 1;
while start < end {
let mid = (start + end) / 2;
if self.index[mid] == index {
start = mid;
break;
}
if self.index[mid] < index {
start = mid + 1;
} else {
end = mid;
}
}
let mut start = self.index.binary_search(&index).unwrap_or_else(|x| x);
if index < self.index[start] {
start -= 1;