diff --git a/crates/loro-internal/src/log_store/import.rs b/crates/loro-internal/src/log_store/import.rs index af55589c..7cbc765f 100644 --- a/crates/loro-internal/src/log_store/import.rs +++ b/crates/loro-internal/src/log_store/import.rs @@ -362,72 +362,53 @@ impl LogStore { fn tailor_changes(&mut self, changes: RemoteClientChanges) -> RemoteClientChanges { let mut latest_vv = self.get_vv().clone(); - println!("#####Decode 开始 changes: {:?}", changes); - println!("latest vv {:?}", latest_vv); - self.debug_pending(); - let mut retain_changes = FxHashMap::default(); - let mut client_to_pending_dep = FxHashMap::default(); - changes - .into_values() - .flat_map(|c| c.into_iter()) - .sorted_by(|a, b| Ord::cmp(&b.lamport, &a.lamport)) - .for_each(|c| { - print!("当前 "); - debug_remote_change(&c); - println!(""); - if let Some(pre_dep) = client_to_pending_dep.get(&c.id.client_id) { - self.pending_changes.get_mut(pre_dep).unwrap().push(c); - return; - } - match can_remote_change_be_applied(&latest_vv, &c) { - ChangeApplyState::Directly => { - println!("apply"); - latest_vv.set_end(c.id_end()); - let last_id = c.id_last(); - retain_changes - .entry(c.id.client_id) - .or_insert_with(Vec::new) - .push(c); - self.try_apply_pending(&last_id, &mut latest_vv, &mut retain_changes); + + if changes.values().map(|c| c.len()).sum::() == 0 { + // snapshot + let client_ids: Vec<_> = latest_vv.keys().copied().collect(); + for client_id in client_ids { + let counter = latest_vv.get_last(client_id).unwrap(); + self.try_apply_pending( + &ID { client_id, counter }, + &mut latest_vv, + &mut retain_changes, + ) + } + } else { + let mut client_to_pending_dep = FxHashMap::default(); + changes + .into_values() + .flat_map(|c| c.into_iter()) + .sorted_by(|a, b| Ord::cmp(&b.lamport, &a.lamport)) + .for_each(|c| { + if let Some(pre_dep) = client_to_pending_dep.get(&c.id.client_id) { + self.pending_changes.get_mut(pre_dep).unwrap().push(c); + return; } - ChangeApplyState::Existing => { - println!("exist") + match can_remote_change_be_applied(&latest_vv, &c) { + ChangeApplyState::Directly => { + latest_vv.set_end(c.id_end()); + let last_id = c.id_last(); + retain_changes + .entry(c.id.client_id) + .or_insert_with(Vec::new) + .push(c); + self.try_apply_pending(&last_id, &mut latest_vv, &mut retain_changes); + } + ChangeApplyState::Existing => {} + ChangeApplyState::Future(dep) => { + client_to_pending_dep.insert(c.id.client_id, dep); + self.pending_changes + .entry(dep) + .or_insert_with(Vec::new) + .push(c); + } } - ChangeApplyState::Future(dep) => { - println!("future dep {:?}", dep); - client_to_pending_dep.insert(c.id.client_id, dep); - self.pending_changes - .entry(dep) - .or_insert_with(Vec::new) - .push(c); - } - } - }); - self.debug_pending(); - // retain_changes - // .values_mut() - // .for_each(|v| v.sort_by(|a, b| Ord::cmp(&a.lamport, &b.lamport))); - println!("!!!!!Decode 结束 \n{:?}", retain_changes); + }); + } retain_changes - // cancel filter empty changes, snapshot can use empty changes to check pending changes - // changes.retain(|_, v| !v.is_empty()); - // for (client_id, changes) in changes.iter_mut() { - // self.filter_changes(client_id, changes); - // } - // changes.retain(|_, v| !v.is_empty()); - // changes - } - - fn debug_pending(&self) { - println!("pending:"); - for (k, v) in self.pending_changes.iter() { - print!(" {:?}: ", k); - v.iter().for_each(debug_remote_change); - println!(""); - } - println!("") } fn try_apply_pending( @@ -437,7 +418,6 @@ impl LogStore { retain_changes: &mut RemoteClientChanges, ) { if let Some(may_apply_changes) = self.pending_changes.remove(dep) { - println!(" 有此依赖 {:?} ", dep); let mut may_apply_iter = may_apply_changes .into_iter() .sorted_by(|a, b| a.lamport.cmp(&b.lamport)) @@ -446,9 +426,6 @@ impl LogStore { match can_remote_change_be_applied(latest_vv, peek_c) { ChangeApplyState::Directly => { let c = may_apply_iter.next().unwrap(); - print!("apply "); - debug_remote_change(&c); - println!(""); latest_vv.set_end(c.id_end()); let last_id = c.id_last(); // other pending @@ -500,21 +477,6 @@ fn can_remote_change_be_applied(vv: &VersionVector, change: &Change) - ChangeApplyState::Directly } -fn debug_remote_change(change: &Change) { - print!( - "Change: id_span: {:?} deps: {:?} lamport {}, ", - change.id_span(), - change.deps, - change.lamport - ); -} - -// #[derive(Debug)] -// pub(crate) struct ChangeWithLamport { -// change: Change, -// lamport: Lamport, -// } - #[cfg(test)] mod test { use crate::{LoroCore, VersionVector}; @@ -584,12 +546,12 @@ mod test { let version_a1b1 = a.vv_cloned(); text_a.insert(&a, 2, "c").unwrap(); let update_a2 = a.encode_from(version_a1b1); - // c.decode(&update_a2).unwrap(); - // assert_eq!(c.to_json().to_json(), "{}"); - // c.decode(&update_a1).unwrap(); - // assert_eq!(c.to_json().to_json(), "{\"text\":\"a\"}"); - // c.decode(&update_b1).unwrap(); - // assert_eq!(a.to_json(), c.to_json()); + c.decode(&update_a2).unwrap(); + assert_eq!(c.to_json().to_json(), "{}"); + c.decode(&update_a1).unwrap(); + assert_eq!(c.to_json().to_json(), "{\"text\":\"a\"}"); + c.decode(&update_b1).unwrap(); + assert_eq!(a.to_json(), c.to_json()); d.decode(&update_a2).unwrap(); assert_eq!(d.to_json().to_json(), "{}"); diff --git a/crates/loro-internal/src/version.rs b/crates/loro-internal/src/version.rs index 49e35353..22c788ba 100644 --- a/crates/loro-internal/src/version.rs +++ b/crates/loro-internal/src/version.rs @@ -327,7 +327,7 @@ impl VersionVector { } #[inline] - pub fn get_last(&mut self, client_id: ClientID) -> Option { + pub fn get_last(&self, client_id: ClientID) -> Option { self.0 .get(&client_id) .and_then(|&x| if x == 0 { None } else { Some(x - 1) })