fix: import change slice

This commit is contained in:
leeeon233 2023-03-28 20:09:33 +08:00
parent abafec9e18
commit e6a4be5dcf

View file

@ -2,7 +2,7 @@ use crate::change::Change;
use crate::hierarchy::Hierarchy; use crate::hierarchy::Hierarchy;
use crate::id::{ClientID, Counter, ID}; use crate::id::{ClientID, Counter, ID};
use crate::op::RemoteOp; use crate::op::RemoteOp;
use crate::span::{CounterSpan, HasCounterSpan}; use crate::span::{CounterSpan, HasCounter, HasCounterSpan};
use crate::version::PatchedVersionVector; use crate::version::PatchedVersionVector;
use crate::LogStore; use crate::LogStore;
use crate::{ use crate::{
@ -18,7 +18,7 @@ use tracing::instrument;
use fxhash::{FxHashMap, FxHashSet}; use fxhash::{FxHashMap, FxHashSet};
use rle::{HasLength, RleVecWithIndex}; use rle::{slice_vec_by, HasLength, RleVecWithIndex, Sliceable};
use crate::{ use crate::{
container::{registry::ContainerInstance, ContainerID, ContainerTrait}, container::{registry::ContainerInstance, ContainerID, ContainerTrait},
@ -201,6 +201,12 @@ impl LogStore {
.entry(*client_id) .entry(*client_id)
.or_insert_with(|| RleVecWithIndex::new_cfg(cfg.clone())); .or_insert_with(|| RleVecWithIndex::new_cfg(cfg.clone()));
for change in inner_changes { for change in inner_changes {
// if let Some(last) = rle.last() {
// assert_eq!(
// last.id.counter + last.atom_len() as Counter,
// change.id.counter
// )
// }
rle.push(change); rle.push(change);
} }
} }
@ -359,7 +365,10 @@ impl LogStore {
} }
} }
fn process_and_queue_changes(&mut self, changes: RemoteClientChanges) -> RemoteClientChanges { fn process_and_queue_changes(
&mut self,
mut changes: RemoteClientChanges,
) -> RemoteClientChanges {
let mut latest_vv = self.get_vv().clone(); let mut latest_vv = self.get_vv().clone();
let mut retain_changes = FxHashMap::default(); let mut retain_changes = FxHashMap::default();
@ -373,19 +382,20 @@ impl LogStore {
} else { } else {
// Changes will be sorted by lamport. If the first change cannot be applied, then all subsequent changes with the same client id cannot be applied either. // Changes will be sorted by lamport. If the first change cannot be applied, then all subsequent changes with the same client id cannot be applied either.
// we cache these client id. // we cache these client id.
// self.tailor_changes(&mut changes);
let mut pending_clients = FxHashSet::default(); let mut pending_clients = FxHashSet::default();
changes changes
.into_values() .into_values()
.flat_map(|c| c.into_iter()) .flat_map(|c| c.into_iter())
// sort changes by lamport from small to large // sort changes by lamport from small to large
.sorted_by(|a, b| a.lamport.cmp(&b.lamport)) .sorted_by(|a, b| a.lamport.cmp(&b.lamport))
.for_each(|c| { .for_each(|mut c| {
let c_client_id = c.id.client_id; let c_client_id = c.id.client_id;
if pending_clients.contains(&c_client_id) { if pending_clients.contains(&c_client_id) {
self.pending_changes.get_mut(&c_client_id).unwrap().push(c); self.pending_changes.get_mut(&c_client_id).unwrap().push(c);
return; return;
} }
match can_remote_change_be_applied(&latest_vv, &c) { match can_remote_change_be_applied(&latest_vv, &mut c) {
ChangeApplyState::Directly => { ChangeApplyState::Directly => {
latest_vv.set_end(c.id_end()); latest_vv.set_end(c.id_end());
retain_changes retain_changes
@ -409,7 +419,6 @@ impl LogStore {
} }
}); });
} }
retain_changes retain_changes
} }
@ -424,7 +433,7 @@ impl LogStore {
.into_iter() .into_iter()
.sorted_by(|a, b| a.lamport.cmp(&b.lamport)) .sorted_by(|a, b| a.lamport.cmp(&b.lamport))
.peekable(); .peekable();
while let Some(peek_c) = may_apply_iter.peek() { while let Some(peek_c) = may_apply_iter.peek_mut() {
match can_remote_change_be_applied(latest_vv, peek_c) { match can_remote_change_be_applied(latest_vv, peek_c) {
ChangeApplyState::Directly => { ChangeApplyState::Directly => {
let c = may_apply_iter.next().unwrap(); let c = may_apply_iter.next().unwrap();
@ -451,8 +460,30 @@ impl LogStore {
} }
} }
} }
fn tailor_changes(&mut self, changes: &mut RemoteClientChanges) {
changes.retain(|_, v| !v.is_empty());
for (client_id, changes) in changes.iter_mut() {
let self_end_ctr = self.vv.get(client_id).copied().unwrap_or(0);
let other_start_ctr = changes.first().unwrap().ctr_start();
match other_start_ctr.cmp(&self_end_ctr) {
std::cmp::Ordering::Less => {
*changes = slice_vec_by(
changes,
|x| x.id.counter as usize,
self_end_ctr as usize,
usize::MAX,
);
}
std::cmp::Ordering::Equal => {}
std::cmp::Ordering::Greater => {}
}
}
changes.retain(|_, v| !v.is_empty());
}
} }
#[derive(Debug)]
enum ChangeApplyState { enum ChangeApplyState {
Existing, Existing,
Directly, Directly,
@ -460,14 +491,17 @@ enum ChangeApplyState {
Future(ClientID), Future(ClientID),
} }
fn can_remote_change_be_applied(vv: &VersionVector, change: &Change<RemoteOp>) -> ChangeApplyState { fn can_remote_change_be_applied(
vv: &VersionVector,
change: &mut Change<RemoteOp>,
) -> ChangeApplyState {
let change_client_id = change.id.client_id; let change_client_id = change.id.client_id;
let CounterSpan { start, end } = change.ctr_span(); let CounterSpan { start, end } = change.ctr_span();
let vv_latest_ctr = vv.get(&change_client_id).copied().unwrap_or(0); let vv_latest_ctr = vv.get(&change_client_id).copied().unwrap_or(0);
if vv_latest_ctr < start { if vv_latest_ctr < start {
return ChangeApplyState::Future(change_client_id); return ChangeApplyState::Future(change_client_id);
} }
if vv_latest_ctr >= end { if vv_latest_ctr >= end || start == end {
return ChangeApplyState::Existing; return ChangeApplyState::Existing;
} }
for dep in &change.deps { for dep in &change.deps {
@ -476,12 +510,17 @@ fn can_remote_change_be_applied(vv: &VersionVector, change: &Change<RemoteOp>) -
return ChangeApplyState::Future(dep.client_id); return ChangeApplyState::Future(dep.client_id);
} }
} }
if start < vv_latest_ctr {
*change = change.slice((vv_latest_ctr - start) as usize, (end - start) as usize);
}
ChangeApplyState::Directly ChangeApplyState::Directly
} }
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use crate::{LoroCore, VersionVector}; use crate::{LoroCore, Transact, VersionVector};
#[test] #[test]
fn import_pending() { fn import_pending() {
@ -615,4 +654,24 @@ mod test {
c.decode(&updates_a123).unwrap(); c.decode(&updates_a123).unwrap();
assert_eq!(c.to_json(), a.to_json()); assert_eq!(c.to_json(), a.to_json());
} }
#[test]
fn applied_change_filter() {
let mut a = LoroCore::new(Default::default(), Some(1));
let mut b = LoroCore::new(Default::default(), Some(2));
let mut list_a = a.get_list("list");
let mut list_b = b.get_list("list");
{
let txn = a.transact();
list_a.insert(&txn, 0, "1").unwrap();
list_a.insert(&txn, 1, "1").unwrap();
}
b.decode(&a.encode_from(Default::default())).unwrap();
{
let txn = a.transact();
list_a.insert(&txn, 2, "1").unwrap();
list_a.insert(&txn, 3, "1").unwrap();
}
b.decode(&a.encode_from(Default::default())).unwrap();
}
} }