feat: pending import

This commit is contained in:
leeeon233 2023-03-24 15:55:07 +08:00 committed by Leonzhao
parent 6df69bd2be
commit c1a72c3d7e
2 changed files with 150 additions and 19 deletions

View file

@ -9,6 +9,7 @@ use crate::{version::Frontiers, LoroValue};
pub use encoding::{EncodeMode, LoroEncoder};
pub(crate) use import::ImportContext;
use std::{
collections::BinaryHeap,
marker::PhantomPinned,
sync::{Arc, Mutex, MutexGuard, RwLock, Weak},
};
@ -31,6 +32,8 @@ use crate::{
ContainerType, Lamport, Op, Timestamp, VersionVector, ID,
};
use self::import::ChangesWithNegStartCounter;
const _YEAR: u64 = 365 * 24 * 60 * 60;
const MONTH: u64 = 30 * 24 * 60 * 60;
@ -77,6 +80,7 @@ pub struct LogStore {
pub(crate) this_client_id: ClientID,
/// CRDT container manager
pub(crate) reg: ContainerRegistry,
pending_changes: FxHashMap<ClientID, BinaryHeap<ChangesWithNegStartCounter>>,
_pin: PhantomPinned,
}
@ -94,6 +98,7 @@ impl LogStore {
frontiers: Default::default(),
vv: Default::default(),
reg: ContainerRegistry::new(),
pending_changes: Default::default(),
_pin: PhantomPinned,
}))
}

View file

@ -1,5 +1,7 @@
use crate::change::Change;
use crate::hierarchy::Hierarchy;
use crate::id::{Counter, ID};
use crate::id::{ClientID, Counter, ID};
use crate::op::RemoteOp;
use crate::version::PatchedVersionVector;
use crate::LogStore;
use crate::{
@ -8,6 +10,7 @@ use crate::{
version::{Frontiers, IdSpanVector},
};
use smallvec::{smallvec, SmallVec};
use std::collections::BinaryHeap;
use std::sync::Arc;
use std::{collections::VecDeque, sync::MutexGuard};
use tracing::instrument;
@ -84,9 +87,9 @@ impl LogStore {
pub(crate) fn import(
&mut self,
hierarchy: &mut Hierarchy,
mut changes: RemoteClientChanges,
changes: RemoteClientChanges,
) -> Vec<RawEvent> {
self.tailor_changes(&mut changes);
let changes = self.tailor_changes(changes);
if changes.is_empty() {
return vec![];
}
@ -355,26 +358,149 @@ impl LogStore {
}
}
fn tailor_changes(&mut self, changes: &mut RemoteClientChanges) {
fn tailor_changes(&mut self, mut changes: RemoteClientChanges) -> RemoteClientChanges {
changes.retain(|_, v| !v.is_empty());
for (client_id, changes) in changes.iter_mut() {
let self_end_ctr = self.vv.get(client_id).copied().unwrap_or(0);
let other_start_ctr = changes.first().unwrap().ctr_start();
match other_start_ctr.cmp(&self_end_ctr) {
std::cmp::Ordering::Less => {
*changes = slice_vec_by(
changes,
|x| x.id.counter as usize,
self_end_ctr as usize,
usize::MAX,
);
}
std::cmp::Ordering::Equal => {}
std::cmp::Ordering::Greater => {
unimplemented!("cache pending changes");
self.filter_changes(client_id, changes);
}
changes.retain(|_, v| !v.is_empty());
changes
}
fn filter_changes(&mut self, client_id: &ClientID, changes: &mut Vec<Change<RemoteOp>>) {
let self_end_ctr = self.vv.get(client_id).copied().unwrap_or(0);
let other_start_ctr = changes.first().unwrap().ctr_start();
match other_start_ctr.cmp(&self_end_ctr) {
std::cmp::Ordering::Less => {
*changes = slice_vec_by(
changes,
|x| x.id.counter as usize,
self_end_ctr as usize,
usize::MAX,
);
}
std::cmp::Ordering::Equal => {}
std::cmp::Ordering::Greater => {
let pending_changes = std::mem::take(changes);
self.pending_changes
.entry(*client_id)
.or_insert_with(BinaryHeap::new)
.push(ChangesWithNegStartCounter {
start_ctr: pending_changes.first().unwrap().ctr_start(),
changes: pending_changes,
})
}
}
// check whether the pending changes can be imported
let mut latest_end_ctr = self_end_ctr + changes.content_len() as i32;
if let Some(pending_heap) = self.pending_changes.get_mut(client_id) {
while let Some(ChangesWithNegStartCounter {
start_ctr,
changes: pending_changes,
}) = pending_heap.pop()
{
match start_ctr.cmp(&latest_end_ctr) {
std::cmp::Ordering::Less => {
let rest_changes = slice_vec_by(
&pending_changes,
|x| x.id.counter as usize,
latest_end_ctr as usize,
usize::MAX,
);
latest_end_ctr += rest_changes.content_len() as i32;
changes.extend(rest_changes);
}
std::cmp::Ordering::Equal => {
latest_end_ctr += pending_changes.content_len() as i32;
changes.extend(pending_changes);
}
std::cmp::Ordering::Greater => {
pending_heap.push(ChangesWithNegStartCounter {
start_ctr,
changes: pending_changes,
});
break;
}
}
}
}
changes.retain(|_, v| !v.is_empty());
}
}
#[derive(Debug)]
pub(crate) struct ChangesWithNegStartCounter {
start_ctr: i32,
changes: Vec<Change<RemoteOp>>,
}
impl PartialEq for ChangesWithNegStartCounter {
fn eq(&self, other: &Self) -> bool {
self.start_ctr.eq(&other.start_ctr)
}
}
impl Eq for ChangesWithNegStartCounter {}
impl PartialOrd for ChangesWithNegStartCounter {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
(-self.start_ctr).partial_cmp(&-other.start_ctr)
}
}
impl Ord for ChangesWithNegStartCounter {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
(-self.start_ctr).cmp(&-other.start_ctr)
}
}
#[cfg(test)]
mod test {
use crate::{LoroCore, VersionVector};
#[test]
fn import_pending() {
let mut a = LoroCore::new(Default::default(), Some(1));
let mut b = LoroCore::new(Default::default(), Some(2));
let mut text_a = a.get_text("text");
text_a.insert(&a, 0, "a").unwrap();
let update1 = a.encode_from(VersionVector::new());
let version1 = a.vv_cloned();
text_a.insert(&a, 0, "b").unwrap();
let update2 = a.encode_from(version1);
let version2 = a.vv_cloned();
text_a.insert(&a, 0, "c").unwrap();
let update3 = a.encode_from(version2);
let version3 = a.vv_cloned();
text_a.insert(&a, 0, "d").unwrap();
let update4 = a.encode_from(version3);
let version4 = a.vv_cloned();
text_a.insert(&a, 0, "e").unwrap();
let update5 = a.encode_from(version4);
b.decode(&update5).unwrap();
b.decode(&update4).unwrap();
b.decode(&update1).unwrap();
b.decode(&update3).unwrap();
b.decode(&update2).unwrap();
assert_eq!(a.to_json(), b.to_json());
}
#[test]
fn pending_import_snapshot() {
let mut a = LoroCore::new(Default::default(), Some(1));
let mut b = LoroCore::new(Default::default(), Some(2));
let mut text_a = a.get_text("text");
text_a.insert(&a, 0, "a").unwrap();
let update1 = a.encode_all();
let version1 = a.vv_cloned();
text_a.insert(&a, 1, "b").unwrap();
let update2 = a.encode_from(version1.clone());
let _version2 = a.vv_cloned();
println!("{:?} {:?}", version1, _version2);
b.decode(&update2).unwrap();
b.decode(&update1).unwrap();
assert_eq!(a.to_json(), b.to_json());
}
}