diff --git a/crates/loro-internal/src/log_store/import.rs b/crates/loro-internal/src/log_store/import.rs index 6fa5ca92..de837469 100644 --- a/crates/loro-internal/src/log_store/import.rs +++ b/crates/loro-internal/src/log_store/import.rs @@ -371,10 +371,13 @@ impl LogStore { self.try_apply_pending(&client_id, &mut latest_vv, &mut retain_changes) } } else { + // Changes will be sorted by lamport. If the first change cannot be applied, then all subsequent changes with the same client id cannot be applied either. + // we cache these client id. let mut pending_clients = FxHashSet::default(); changes .into_values() .flat_map(|c| c.into_iter()) + // sort changes by lamport from small to large .sorted_by(|a, b| Ord::cmp(&b.lamport, &a.lamport)) .for_each(|c| { let c_client_id = c.id.client_id; @@ -385,7 +388,6 @@ impl LogStore { match can_remote_change_be_applied(&latest_vv, &c) { ChangeApplyState::Directly => { latest_vv.set_end(c.id_end()); - // let last_id = c.id_last(); retain_changes .entry(c_client_id) .or_insert_with(Vec::new) @@ -397,10 +399,10 @@ impl LogStore { ); } ChangeApplyState::Existing => {} - ChangeApplyState::Future(dep) => { + ChangeApplyState::Future(this_dep_client) => { pending_clients.insert(c_client_id); self.pending_changes - .entry(dep.client_id) + .entry(this_dep_client) .or_insert_with(Vec::new) .push(c); } @@ -438,9 +440,9 @@ impl LogStore { ChangeApplyState::Existing => { may_apply_iter.next(); } - ChangeApplyState::Future(this_dep) => { + ChangeApplyState::Future(this_dep_client) => { self.pending_changes - .entry(this_dep.client_id) + .entry(this_dep_client) .or_insert_with(Vec::new) .extend(may_apply_iter); break; @@ -454,8 +456,8 @@ impl LogStore { enum ChangeApplyState { Existing, Directly, - /// The id of first missing dep - Future(ID), + /// The client id of first missing dep + Future(ClientID), } fn can_remote_change_be_applied(vv: &VersionVector, change: &Change) -> ChangeApplyState { @@ -463,7 +465,7 @@ fn can_remote_change_be_applied(vv: &VersionVector, change: &Change) - let CounterSpan { start, end } = change.ctr_span(); let vv_latest_ctr = vv.get(&change_client_id).copied().unwrap_or(0); if vv_latest_ctr < start { - return ChangeApplyState::Future(change.id.inc(-1)); + return ChangeApplyState::Future(change_client_id); } if vv_latest_ctr >= end { return ChangeApplyState::Existing; @@ -471,7 +473,7 @@ fn can_remote_change_be_applied(vv: &VersionVector, change: &Change) - for dep in &change.deps { let dep_vv_latest_ctr = vv.get(&dep.client_id).copied().unwrap_or(0); if dep_vv_latest_ctr - 1 < dep.counter { - return ChangeApplyState::Future(*dep); + return ChangeApplyState::Future(dep.client_id); } } ChangeApplyState::Directly