doc: import pending

This commit is contained in:
leeeon233 2023-03-26 12:07:29 +08:00 committed by Leonzhao
parent d4f786e64a
commit e6c5cefe54

View file

@ -371,10 +371,13 @@ impl LogStore {
self.try_apply_pending(&client_id, &mut latest_vv, &mut retain_changes) self.try_apply_pending(&client_id, &mut latest_vv, &mut retain_changes)
} }
} else { } else {
// Changes will be sorted by lamport. If the first change cannot be applied, then all subsequent changes with the same client id cannot be applied either.
// we cache these client id.
let mut pending_clients = FxHashSet::default(); let mut pending_clients = FxHashSet::default();
changes changes
.into_values() .into_values()
.flat_map(|c| c.into_iter()) .flat_map(|c| c.into_iter())
// sort changes by lamport from small to large
.sorted_by(|a, b| Ord::cmp(&b.lamport, &a.lamport)) .sorted_by(|a, b| Ord::cmp(&b.lamport, &a.lamport))
.for_each(|c| { .for_each(|c| {
let c_client_id = c.id.client_id; let c_client_id = c.id.client_id;
@ -385,7 +388,6 @@ impl LogStore {
match can_remote_change_be_applied(&latest_vv, &c) { match can_remote_change_be_applied(&latest_vv, &c) {
ChangeApplyState::Directly => { ChangeApplyState::Directly => {
latest_vv.set_end(c.id_end()); latest_vv.set_end(c.id_end());
// let last_id = c.id_last();
retain_changes retain_changes
.entry(c_client_id) .entry(c_client_id)
.or_insert_with(Vec::new) .or_insert_with(Vec::new)
@ -397,10 +399,10 @@ impl LogStore {
); );
} }
ChangeApplyState::Existing => {} ChangeApplyState::Existing => {}
ChangeApplyState::Future(dep) => { ChangeApplyState::Future(this_dep_client) => {
pending_clients.insert(c_client_id); pending_clients.insert(c_client_id);
self.pending_changes self.pending_changes
.entry(dep.client_id) .entry(this_dep_client)
.or_insert_with(Vec::new) .or_insert_with(Vec::new)
.push(c); .push(c);
} }
@ -438,9 +440,9 @@ impl LogStore {
ChangeApplyState::Existing => { ChangeApplyState::Existing => {
may_apply_iter.next(); may_apply_iter.next();
} }
ChangeApplyState::Future(this_dep) => { ChangeApplyState::Future(this_dep_client) => {
self.pending_changes self.pending_changes
.entry(this_dep.client_id) .entry(this_dep_client)
.or_insert_with(Vec::new) .or_insert_with(Vec::new)
.extend(may_apply_iter); .extend(may_apply_iter);
break; break;
@ -454,8 +456,8 @@ impl LogStore {
enum ChangeApplyState { enum ChangeApplyState {
Existing, Existing,
Directly, Directly,
/// The id of first missing dep /// The client id of first missing dep
Future(ID), Future(ClientID),
} }
fn can_remote_change_be_applied(vv: &VersionVector, change: &Change<RemoteOp>) -> ChangeApplyState { fn can_remote_change_be_applied(vv: &VersionVector, change: &Change<RemoteOp>) -> ChangeApplyState {
@ -463,7 +465,7 @@ fn can_remote_change_be_applied(vv: &VersionVector, change: &Change<RemoteOp>) -
let CounterSpan { start, end } = change.ctr_span(); let CounterSpan { start, end } = change.ctr_span();
let vv_latest_ctr = vv.get(&change_client_id).copied().unwrap_or(0); let vv_latest_ctr = vv.get(&change_client_id).copied().unwrap_or(0);
if vv_latest_ctr < start { if vv_latest_ctr < start {
return ChangeApplyState::Future(change.id.inc(-1)); return ChangeApplyState::Future(change_client_id);
} }
if vv_latest_ctr >= end { if vv_latest_ctr >= end {
return ChangeApplyState::Existing; return ChangeApplyState::Existing;
@ -471,7 +473,7 @@ fn can_remote_change_be_applied(vv: &VersionVector, change: &Change<RemoteOp>) -
for dep in &change.deps { for dep in &change.deps {
let dep_vv_latest_ctr = vv.get(&dep.client_id).copied().unwrap_or(0); let dep_vv_latest_ctr = vv.get(&dep.client_id).copied().unwrap_or(0);
if dep_vv_latest_ctr - 1 < dep.counter { if dep_vv_latest_ctr - 1 < dep.counter {
return ChangeApplyState::Future(*dep); return ChangeApplyState::Future(dep.client_id);
} }
} }
ChangeApplyState::Directly ChangeApplyState::Directly