From c1a72c3d7e2feb73258177226cb1d02d16385b17 Mon Sep 17 00:00:00 2001 From: leeeon233 Date: Fri, 24 Mar 2023 15:55:07 +0800 Subject: [PATCH] feat: pending import --- crates/loro-internal/src/log_store.rs | 5 + crates/loro-internal/src/log_store/import.rs | 164 ++++++++++++++++--- 2 files changed, 150 insertions(+), 19 deletions(-) diff --git a/crates/loro-internal/src/log_store.rs b/crates/loro-internal/src/log_store.rs index d0ea86f9..f6347d81 100644 --- a/crates/loro-internal/src/log_store.rs +++ b/crates/loro-internal/src/log_store.rs @@ -9,6 +9,7 @@ use crate::{version::Frontiers, LoroValue}; pub use encoding::{EncodeMode, LoroEncoder}; pub(crate) use import::ImportContext; use std::{ + collections::BinaryHeap, marker::PhantomPinned, sync::{Arc, Mutex, MutexGuard, RwLock, Weak}, }; @@ -31,6 +32,8 @@ use crate::{ ContainerType, Lamport, Op, Timestamp, VersionVector, ID, }; +use self::import::ChangesWithNegStartCounter; + const _YEAR: u64 = 365 * 24 * 60 * 60; const MONTH: u64 = 30 * 24 * 60 * 60; @@ -77,6 +80,7 @@ pub struct LogStore { pub(crate) this_client_id: ClientID, /// CRDT container manager pub(crate) reg: ContainerRegistry, + pending_changes: FxHashMap>, _pin: PhantomPinned, } @@ -94,6 +98,7 @@ impl LogStore { frontiers: Default::default(), vv: Default::default(), reg: ContainerRegistry::new(), + pending_changes: Default::default(), _pin: PhantomPinned, })) } diff --git a/crates/loro-internal/src/log_store/import.rs b/crates/loro-internal/src/log_store/import.rs index 58046452..b4aaa740 100644 --- a/crates/loro-internal/src/log_store/import.rs +++ b/crates/loro-internal/src/log_store/import.rs @@ -1,5 +1,7 @@ +use crate::change::Change; use crate::hierarchy::Hierarchy; -use crate::id::{Counter, ID}; +use crate::id::{ClientID, Counter, ID}; +use crate::op::RemoteOp; use crate::version::PatchedVersionVector; use crate::LogStore; use crate::{ @@ -8,6 +10,7 @@ use crate::{ version::{Frontiers, IdSpanVector}, }; use smallvec::{smallvec, SmallVec}; +use std::collections::BinaryHeap; use std::sync::Arc; use std::{collections::VecDeque, sync::MutexGuard}; use tracing::instrument; @@ -84,9 +87,9 @@ impl LogStore { pub(crate) fn import( &mut self, hierarchy: &mut Hierarchy, - mut changes: RemoteClientChanges, + changes: RemoteClientChanges, ) -> Vec { - self.tailor_changes(&mut changes); + let changes = self.tailor_changes(changes); if changes.is_empty() { return vec![]; } @@ -355,26 +358,149 @@ impl LogStore { } } - fn tailor_changes(&mut self, changes: &mut RemoteClientChanges) { + fn tailor_changes(&mut self, mut changes: RemoteClientChanges) -> RemoteClientChanges { changes.retain(|_, v| !v.is_empty()); for (client_id, changes) in changes.iter_mut() { - let self_end_ctr = self.vv.get(client_id).copied().unwrap_or(0); - let other_start_ctr = changes.first().unwrap().ctr_start(); - match other_start_ctr.cmp(&self_end_ctr) { - std::cmp::Ordering::Less => { - *changes = slice_vec_by( - changes, - |x| x.id.counter as usize, - self_end_ctr as usize, - usize::MAX, - ); - } - std::cmp::Ordering::Equal => {} - std::cmp::Ordering::Greater => { - unimplemented!("cache pending changes"); + self.filter_changes(client_id, changes); + } + changes.retain(|_, v| !v.is_empty()); + changes + } + + fn filter_changes(&mut self, client_id: &ClientID, changes: &mut Vec>) { + let self_end_ctr = self.vv.get(client_id).copied().unwrap_or(0); + let other_start_ctr = changes.first().unwrap().ctr_start(); + match other_start_ctr.cmp(&self_end_ctr) { + std::cmp::Ordering::Less => { + *changes = slice_vec_by( + changes, + |x| x.id.counter as usize, + self_end_ctr as usize, + usize::MAX, + ); + } + std::cmp::Ordering::Equal => {} + std::cmp::Ordering::Greater => { + let pending_changes = std::mem::take(changes); + self.pending_changes + .entry(*client_id) + .or_insert_with(BinaryHeap::new) + .push(ChangesWithNegStartCounter { + start_ctr: pending_changes.first().unwrap().ctr_start(), + changes: pending_changes, + }) + } + } + + // check whether the pending changes can be imported + let mut latest_end_ctr = self_end_ctr + changes.content_len() as i32; + if let Some(pending_heap) = self.pending_changes.get_mut(client_id) { + while let Some(ChangesWithNegStartCounter { + start_ctr, + changes: pending_changes, + }) = pending_heap.pop() + { + match start_ctr.cmp(&latest_end_ctr) { + std::cmp::Ordering::Less => { + let rest_changes = slice_vec_by( + &pending_changes, + |x| x.id.counter as usize, + latest_end_ctr as usize, + usize::MAX, + ); + latest_end_ctr += rest_changes.content_len() as i32; + changes.extend(rest_changes); + } + std::cmp::Ordering::Equal => { + latest_end_ctr += pending_changes.content_len() as i32; + changes.extend(pending_changes); + } + std::cmp::Ordering::Greater => { + pending_heap.push(ChangesWithNegStartCounter { + start_ctr, + changes: pending_changes, + }); + break; + } } } } - changes.retain(|_, v| !v.is_empty()); + } +} + +#[derive(Debug)] +pub(crate) struct ChangesWithNegStartCounter { + start_ctr: i32, + changes: Vec>, +} + +impl PartialEq for ChangesWithNegStartCounter { + fn eq(&self, other: &Self) -> bool { + self.start_ctr.eq(&other.start_ctr) + } +} + +impl Eq for ChangesWithNegStartCounter {} + +impl PartialOrd for ChangesWithNegStartCounter { + fn partial_cmp(&self, other: &Self) -> Option { + (-self.start_ctr).partial_cmp(&-other.start_ctr) + } +} + +impl Ord for ChangesWithNegStartCounter { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + (-self.start_ctr).cmp(&-other.start_ctr) + } +} + +#[cfg(test)] +mod test { + use crate::{LoroCore, VersionVector}; + + #[test] + fn import_pending() { + let mut a = LoroCore::new(Default::default(), Some(1)); + let mut b = LoroCore::new(Default::default(), Some(2)); + let mut text_a = a.get_text("text"); + + text_a.insert(&a, 0, "a").unwrap(); + let update1 = a.encode_from(VersionVector::new()); + let version1 = a.vv_cloned(); + text_a.insert(&a, 0, "b").unwrap(); + let update2 = a.encode_from(version1); + let version2 = a.vv_cloned(); + text_a.insert(&a, 0, "c").unwrap(); + let update3 = a.encode_from(version2); + let version3 = a.vv_cloned(); + text_a.insert(&a, 0, "d").unwrap(); + let update4 = a.encode_from(version3); + let version4 = a.vv_cloned(); + text_a.insert(&a, 0, "e").unwrap(); + let update5 = a.encode_from(version4); + b.decode(&update5).unwrap(); + b.decode(&update4).unwrap(); + b.decode(&update1).unwrap(); + b.decode(&update3).unwrap(); + b.decode(&update2).unwrap(); + assert_eq!(a.to_json(), b.to_json()); + } + + #[test] + fn pending_import_snapshot() { + let mut a = LoroCore::new(Default::default(), Some(1)); + let mut b = LoroCore::new(Default::default(), Some(2)); + let mut text_a = a.get_text("text"); + + text_a.insert(&a, 0, "a").unwrap(); + let update1 = a.encode_all(); + let version1 = a.vv_cloned(); + text_a.insert(&a, 1, "b").unwrap(); + let update2 = a.encode_from(version1.clone()); + let _version2 = a.vv_cloned(); + println!("{:?} {:?}", version1, _version2); + b.decode(&update2).unwrap(); + b.decode(&update1).unwrap(); + assert_eq!(a.to_json(), b.to_json()); } }