fix: change deps bug

This commit is contained in:
Zixuan Chen 2022-10-21 22:28:39 +08:00
parent a2fcd73b44
commit 6dcd9d19e8
5 changed files with 49 additions and 31 deletions

View file

@ -106,9 +106,7 @@ impl Mergable<ChangeMergeCfg> for Change {
return false; return false;
} }
if other.deps.is_empty() if other.deps.is_empty() || !(other.deps.len() == 1 && self.last_id() == other.deps[0]) {
|| (other.deps.len() == 1 && self.id.is_connected_id(&other.deps[0], self.len()))
{
return false; return false;
} }

View file

@ -167,7 +167,7 @@ impl Container for TextContainer {
"Stage1 retreat:{} forward:{}\n{}", "Stage1 retreat:{} forward:{}\n{}",
format!("{:?}", &iter.retreat).red(), format!("{:?}", &iter.retreat).red(),
format!("{:?}", &iter.forward).red(), format!("{:?}", &iter.forward).red(),
// format!("{:#?}", &change).blue(), format!("{:#?}", &change).blue(),
); );
for op in change.ops.iter() { for op in change.ops.iter() {
if op.container == self.id { if op.container == self.id {

View file

@ -291,7 +291,7 @@ pub fn test_multi_sites(site_num: u8, mut actions: Vec<Action>) {
for action in actions.iter_mut() { for action in actions.iter_mut() {
sites.preprocess(action); sites.preprocess(action);
applied.push(action.clone()); applied.push(action.clone());
println!("{}", (&applied).table()); debug_log!("{}", (&applied).table());
sites.apply_action(action); sites.apply_action(action);
} }
@ -308,24 +308,24 @@ mod test {
use super::*; use super::*;
#[test] #[test]
fn test_two_1() { fn test_two_change_deps_issue() {
test_multi_sites( test_multi_sites(
2, 2,
vec![ vec![
Ins { Ins {
content: "1".into(), content: "12345".into(),
pos: 0, pos: 281479272970938,
site: 0, site: 21,
},
Sync { from: 0, to: 1 },
Del {
pos: 0,
len: 1,
site: 1,
}, },
Ins { Ins {
content: "2".into(), content: "67890".into(),
pos: 1, pos: 17870294359908942010,
site: 248,
},
Sync { from: 1, to: 0 },
Ins {
content: "abc".into(),
pos: 186,
site: 0, site: 0,
}, },
], ],

View file

@ -106,7 +106,7 @@ impl LogStore {
{ {
check_import_change_valid(&change); check_import_change_valid(&change);
// TODO: cache pending changes // TODO: cache pending changes
assert!(change.deps.iter().all(|x| self_vv.includes_id(*x))); assert!(change.deps.iter().all(|x| self.vv().includes_id(*x)));
self.apply_remote_change(change) self.apply_remote_change(change)
} }
} }
@ -182,28 +182,51 @@ impl LogStore {
&self.frontier &self.frontier
} }
fn update_frontier(&mut self, clear: &[ID], new: &[ID]) {
self.frontier.retain(|x| {
!clear
.iter()
.any(|y| x.client_id == y.client_id && x.counter <= y.counter)
&& !new
.iter()
.any(|y| x.client_id == y.client_id && x.counter <= y.counter)
});
for next in new.iter() {
if self
.frontier
.iter()
.any(|x| x.client_id == next.client_id && x.counter >= next.counter)
{
continue;
}
self.frontier.push(*next);
}
}
/// this method would not get the container and apply op /// this method would not get the container and apply op
pub fn append_local_ops(&mut self, ops: Vec<Op>) { pub fn append_local_ops(&mut self, ops: Vec<Op>) {
if ops.is_empty() {
return;
}
let lamport = self.next_lamport(); let lamport = self.next_lamport();
let timestamp = (self.cfg.get_time)(); let timestamp = (self.cfg.get_time)();
let id = ID { let id = ID {
client_id: self.this_client_id, client_id: self.this_client_id,
counter: self.get_next_counter(self.this_client_id), counter: self.get_next_counter(self.this_client_id),
}; };
let last_id = ops.last().unwrap().id_last();
let change = Change { let change = Change {
id, id,
deps: std::mem::replace(&mut self.frontier, smallvec::smallvec![last_id]),
ops: ops.into(), ops: ops.into(),
deps: std::mem::take(&mut self.frontier),
lamport, lamport,
timestamp, timestamp,
freezed: false, freezed: false,
break_points: Default::default(), break_points: Default::default(),
}; };
self.frontier.push(ID::new(
self.this_client_id,
id.counter + change.len() as Counter - 1,
));
self.latest_lamport = lamport + change.len() as u32 - 1; self.latest_lamport = lamport + change.len() as u32 - 1;
self.latest_timestamp = timestamp; self.latest_timestamp = timestamp;
self.vv.set_end(change.id_end()); self.vv.set_end(change.id_end());
@ -211,6 +234,8 @@ impl LogStore {
.entry(self.this_client_id) .entry(self.this_client_id)
.or_insert_with(RleVec::new) .or_insert_with(RleVec::new)
.push(change); .push(change);
debug_log!("CHANGES---------------- site {}", self.this_client_id);
} }
pub fn apply_remote_change(&mut self, mut change: Change) { pub fn apply_remote_change(&mut self, mut change: Change) {
@ -239,14 +264,9 @@ impl LogStore {
container.apply(change.id_span(), self); container.apply(change.id_span(), self);
} }
drop(container_manager);
self.vv.set_end(change.id_end()); self.vv.set_end(change.id_end());
self.frontier = self self.update_frontier(&change.deps, &[change.last_id()]);
.frontier
.iter()
.filter(|x| !change.deps.contains(x))
.copied()
.collect();
self.frontier.push(change.last_id());
if change.last_lamport() > self.latest_lamport { if change.last_lamport() > self.latest_lamport {
self.latest_lamport = change.last_lamport(); self.latest_lamport = change.last_lamport();

View file

@ -30,7 +30,7 @@ macro_rules! debug_log {
}; };
($($arg:tt)*) => {{ ($($arg:tt)*) => {{
if cfg!(test) { if cfg!(test) {
// use colored::Colorize; use colored::Colorize;
// print!("{}:{}\t", file!().purple(), line!().to_string().purple()); // print!("{}:{}\t", file!().purple(), line!().to_string().purple());
// println!($($arg)*); // println!($($arg)*);
} }