diff --git a/crates/loro-internal/src/log_store.rs b/crates/loro-internal/src/log_store.rs index 095ca9fe..4910a0ce 100644 --- a/crates/loro-internal/src/log_store.rs +++ b/crates/loro-internal/src/log_store.rs @@ -9,7 +9,6 @@ use crate::{version::Frontiers, LoroValue}; pub use encoding::{EncodeMode, LoroEncoder}; pub(crate) use import::ImportContext; use std::{ - collections::BinaryHeap, marker::PhantomPinned, sync::{Arc, Mutex, MutexGuard, RwLock, Weak}, }; @@ -80,7 +79,7 @@ pub struct LogStore { pub(crate) this_client_id: ClientID, /// CRDT container manager pub(crate) reg: ContainerRegistry, - pending_changes: FxHashMap>>, + pending_changes: RemoteClientChanges, _pin: PhantomPinned, } diff --git a/crates/loro-internal/src/log_store/import.rs b/crates/loro-internal/src/log_store/import.rs index 4a50ab74..6fa5ca92 100644 --- a/crates/loro-internal/src/log_store/import.rs +++ b/crates/loro-internal/src/log_store/import.rs @@ -1,4 +1,4 @@ -use crate::change::{Change, Lamport}; +use crate::change::Change; use crate::hierarchy::Hierarchy; use crate::id::{ClientID, Counter, ID}; use crate::op::RemoteOp; @@ -12,20 +12,19 @@ use crate::{ }; use itertools::Itertools; use smallvec::{smallvec, SmallVec}; -use std::collections::BinaryHeap; use std::sync::Arc; use std::{collections::VecDeque, sync::MutexGuard}; use tracing::instrument; use fxhash::{FxHashMap, FxHashSet}; -use rle::{slice_vec_by, HasLength, RleVecWithIndex}; +use rle::{HasLength, RleVecWithIndex}; use crate::{ container::{registry::ContainerInstance, ContainerID, ContainerTrait}, dag::{remove_included_frontiers, DagUtils}, op::RichOp, - span::{HasCounter, HasIdSpan, HasLamportSpan, IdSpan}, + span::{HasIdSpan, HasLamportSpan, IdSpan}, version::are_frontiers_eq, VersionVector, }; @@ -91,7 +90,7 @@ impl LogStore { hierarchy: &mut Hierarchy, changes: RemoteClientChanges, ) -> Vec { - let changes = self.tailor_changes(changes); + let changes = self.process_and_queue_changes(changes); if changes.is_empty() { return vec![]; } @@ -360,7 +359,7 @@ impl LogStore { } } - fn tailor_changes(&mut self, changes: RemoteClientChanges) -> RemoteClientChanges { + fn process_and_queue_changes(&mut self, changes: RemoteClientChanges) -> RemoteClientChanges { let mut latest_vv = self.get_vv().clone(); let mut retain_changes = FxHashMap::default(); @@ -368,39 +367,40 @@ impl LogStore { // snapshot let client_ids: Vec<_> = latest_vv.keys().copied().collect(); for client_id in client_ids { - let counter = latest_vv.get_last(client_id).unwrap(); - self.try_apply_pending( - &ID { client_id, counter }, - &mut latest_vv, - &mut retain_changes, - ) + // let counter = latest_vv.get_last(client_id).unwrap(); + self.try_apply_pending(&client_id, &mut latest_vv, &mut retain_changes) } } else { - let mut client_to_pending_dep = FxHashMap::default(); + let mut pending_clients = FxHashSet::default(); changes .into_values() .flat_map(|c| c.into_iter()) .sorted_by(|a, b| Ord::cmp(&b.lamport, &a.lamport)) .for_each(|c| { - if let Some(pre_dep) = client_to_pending_dep.get(&c.id.client_id) { - self.pending_changes.get_mut(pre_dep).unwrap().push(c); + let c_client_id = c.id.client_id; + if pending_clients.contains(&c_client_id) { + self.pending_changes.get_mut(&c_client_id).unwrap().push(c); return; } match can_remote_change_be_applied(&latest_vv, &c) { ChangeApplyState::Directly => { latest_vv.set_end(c.id_end()); - let last_id = c.id_last(); + // let last_id = c.id_last(); retain_changes - .entry(c.id.client_id) + .entry(c_client_id) .or_insert_with(Vec::new) .push(c); - self.try_apply_pending(&last_id, &mut latest_vv, &mut retain_changes); + self.try_apply_pending( + &c_client_id, + &mut latest_vv, + &mut retain_changes, + ); } ChangeApplyState::Existing => {} ChangeApplyState::Future(dep) => { - client_to_pending_dep.insert(c.id.client_id, dep); + pending_clients.insert(c_client_id); self.pending_changes - .entry(dep) + .entry(dep.client_id) .or_insert_with(Vec::new) .push(c); } @@ -413,11 +413,11 @@ impl LogStore { fn try_apply_pending( &mut self, - dep: &ID, + client_id: &ClientID, latest_vv: &mut VersionVector, retain_changes: &mut RemoteClientChanges, ) { - if let Some(may_apply_changes) = self.pending_changes.remove(dep) { + if let Some(may_apply_changes) = self.pending_changes.remove(client_id) { let mut may_apply_iter = may_apply_changes .into_iter() .sorted_by(|a, b| a.lamport.cmp(&b.lamport)) @@ -426,21 +426,21 @@ impl LogStore { match can_remote_change_be_applied(latest_vv, peek_c) { ChangeApplyState::Directly => { let c = may_apply_iter.next().unwrap(); + let c_client_id = c.id.client_id; latest_vv.set_end(c.id_end()); - let last_id = c.id_last(); // other pending retain_changes - .entry(c.id.client_id) + .entry(c_client_id) .or_insert_with(Vec::new) .push(c); - self.try_apply_pending(&last_id, latest_vv, retain_changes); + self.try_apply_pending(&c_client_id, latest_vv, retain_changes); } ChangeApplyState::Existing => { may_apply_iter.next(); } ChangeApplyState::Future(this_dep) => { self.pending_changes - .entry(this_dep) + .entry(this_dep.client_id) .or_insert_with(Vec::new) .extend(may_apply_iter); break; @@ -454,7 +454,7 @@ impl LogStore { enum ChangeApplyState { Existing, Directly, - // The first dissatisfied deps: the previous change or it's deps + /// The id of first missing dep Future(ID), } @@ -582,4 +582,35 @@ mod test { a.decode(&b_change).unwrap(); assert_eq!(c.to_json(), a.to_json()); } + + #[test] + fn pending_changes_may_deps_merged_change() { + // a: (a1 <-- a2 <-- a3) <-- a4 a1~a3 is a merged change + // \ / + // b: b1 + let mut a = LoroCore::new(Default::default(), Some(1)); + let mut b = LoroCore::new(Default::default(), Some(2)); + let mut c = LoroCore::new(Default::default(), Some(3)); + let mut text_a = a.get_text("text"); + let mut text_b = b.get_text("text"); + text_a.insert(&a, 0, "a").unwrap(); + text_a.insert(&a, 1, "b").unwrap(); + let version_a12 = a.vv_cloned(); + let updates_a12 = a.encode_all(); + text_a.insert(&a, 2, "c").unwrap(); + let updates_a123 = a.encode_all(); + b.decode(&updates_a12).unwrap(); + text_b.insert(&b, 2, "d").unwrap(); + let update_b1 = b.encode_from(version_a12); + a.decode(&update_b1).unwrap(); + let version_a123_b1 = a.vv_cloned(); + text_a.insert(&a, 4, "e").unwrap(); + let update_a4 = a.encode_from(version_a123_b1); + c.decode(&update_b1).unwrap(); + assert_eq!(c.to_json().to_json(), "{}"); + c.decode(&update_a4).unwrap(); + assert_eq!(c.to_json().to_json(), "{}"); + c.decode(&updates_a123).unwrap(); + assert_eq!(c.to_json(), a.to_json()); + } }