diff --git a/crates/loro-internal/src/log_store/import.rs b/crates/loro-internal/src/log_store/import.rs index bebbea55..3564c737 100644 --- a/crates/loro-internal/src/log_store/import.rs +++ b/crates/loro-internal/src/log_store/import.rs @@ -2,7 +2,7 @@ use crate::change::Change; use crate::hierarchy::Hierarchy; use crate::id::{ClientID, Counter, ID}; use crate::op::RemoteOp; -use crate::span::{CounterSpan, HasCounterSpan}; +use crate::span::{CounterSpan, HasCounter, HasCounterSpan}; use crate::version::PatchedVersionVector; use crate::LogStore; use crate::{ @@ -18,7 +18,7 @@ use tracing::instrument; use fxhash::{FxHashMap, FxHashSet}; -use rle::{HasLength, RleVecWithIndex}; +use rle::{slice_vec_by, HasLength, RleVecWithIndex, Sliceable}; use crate::{ container::{registry::ContainerInstance, ContainerID, ContainerTrait}, @@ -201,6 +201,12 @@ impl LogStore { .entry(*client_id) .or_insert_with(|| RleVecWithIndex::new_cfg(cfg.clone())); for change in inner_changes { + // if let Some(last) = rle.last() { + // assert_eq!( + // last.id.counter + last.atom_len() as Counter, + // change.id.counter + // ) + // } rle.push(change); } } @@ -359,7 +365,10 @@ impl LogStore { } } - fn process_and_queue_changes(&mut self, changes: RemoteClientChanges) -> RemoteClientChanges { + fn process_and_queue_changes( + &mut self, + mut changes: RemoteClientChanges, + ) -> RemoteClientChanges { let mut latest_vv = self.get_vv().clone(); let mut retain_changes = FxHashMap::default(); @@ -373,19 +382,20 @@ impl LogStore { } else { // Changes will be sorted by lamport. If the first change cannot be applied, then all subsequent changes with the same client id cannot be applied either. // we cache these client id. + // self.tailor_changes(&mut changes); let mut pending_clients = FxHashSet::default(); changes .into_values() .flat_map(|c| c.into_iter()) // sort changes by lamport from small to large .sorted_by(|a, b| a.lamport.cmp(&b.lamport)) - .for_each(|c| { + .for_each(|mut c| { let c_client_id = c.id.client_id; if pending_clients.contains(&c_client_id) { self.pending_changes.get_mut(&c_client_id).unwrap().push(c); return; } - match can_remote_change_be_applied(&latest_vv, &c) { + match can_remote_change_be_applied(&latest_vv, &mut c) { ChangeApplyState::Directly => { latest_vv.set_end(c.id_end()); retain_changes @@ -409,7 +419,6 @@ impl LogStore { } }); } - retain_changes } @@ -424,7 +433,7 @@ impl LogStore { .into_iter() .sorted_by(|a, b| a.lamport.cmp(&b.lamport)) .peekable(); - while let Some(peek_c) = may_apply_iter.peek() { + while let Some(peek_c) = may_apply_iter.peek_mut() { match can_remote_change_be_applied(latest_vv, peek_c) { ChangeApplyState::Directly => { let c = may_apply_iter.next().unwrap(); @@ -451,8 +460,30 @@ impl LogStore { } } } + + fn tailor_changes(&mut self, changes: &mut RemoteClientChanges) { + changes.retain(|_, v| !v.is_empty()); + for (client_id, changes) in changes.iter_mut() { + let self_end_ctr = self.vv.get(client_id).copied().unwrap_or(0); + let other_start_ctr = changes.first().unwrap().ctr_start(); + match other_start_ctr.cmp(&self_end_ctr) { + std::cmp::Ordering::Less => { + *changes = slice_vec_by( + changes, + |x| x.id.counter as usize, + self_end_ctr as usize, + usize::MAX, + ); + } + std::cmp::Ordering::Equal => {} + std::cmp::Ordering::Greater => {} + } + } + changes.retain(|_, v| !v.is_empty()); + } } +#[derive(Debug)] enum ChangeApplyState { Existing, Directly, @@ -460,14 +491,17 @@ enum ChangeApplyState { Future(ClientID), } -fn can_remote_change_be_applied(vv: &VersionVector, change: &Change) -> ChangeApplyState { +fn can_remote_change_be_applied( + vv: &VersionVector, + change: &mut Change, +) -> ChangeApplyState { let change_client_id = change.id.client_id; let CounterSpan { start, end } = change.ctr_span(); let vv_latest_ctr = vv.get(&change_client_id).copied().unwrap_or(0); if vv_latest_ctr < start { return ChangeApplyState::Future(change_client_id); } - if vv_latest_ctr >= end { + if vv_latest_ctr >= end || start == end { return ChangeApplyState::Existing; } for dep in &change.deps { @@ -476,12 +510,17 @@ fn can_remote_change_be_applied(vv: &VersionVector, change: &Change) - return ChangeApplyState::Future(dep.client_id); } } + + if start < vv_latest_ctr { + *change = change.slice((vv_latest_ctr - start) as usize, (end - start) as usize); + } + ChangeApplyState::Directly } #[cfg(test)] mod test { - use crate::{LoroCore, VersionVector}; + use crate::{LoroCore, Transact, VersionVector}; #[test] fn import_pending() { @@ -615,4 +654,24 @@ mod test { c.decode(&updates_a123).unwrap(); assert_eq!(c.to_json(), a.to_json()); } + + #[test] + fn applied_change_filter() { + let mut a = LoroCore::new(Default::default(), Some(1)); + let mut b = LoroCore::new(Default::default(), Some(2)); + let mut list_a = a.get_list("list"); + let mut list_b = b.get_list("list"); + { + let txn = a.transact(); + list_a.insert(&txn, 0, "1").unwrap(); + list_a.insert(&txn, 1, "1").unwrap(); + } + b.decode(&a.encode_from(Default::default())).unwrap(); + { + let txn = a.transact(); + list_a.insert(&txn, 2, "1").unwrap(); + list_a.insert(&txn, 3, "1").unwrap(); + } + b.decode(&a.encode_from(Default::default())).unwrap(); + } }