diff --git a/crates/language/src/tests.rs b/crates/language/src/tests.rs index af2d8ee911..a194c26625 100644 --- a/crates/language/src/tests.rs +++ b/crates/language/src/tests.rs @@ -821,7 +821,10 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) { } 50..=59 if replica_ids.len() < max_peers => { let old_buffer = buffer.read(cx).to_proto(); - let new_replica_id = replica_ids.len() as ReplicaId; + let new_replica_id = (0..=replica_ids.len() as ReplicaId) + .filter(|replica_id| *replica_id != buffer.read(cx).replica_id()) + .choose(&mut rng) + .unwrap(); log::info!( "Adding new replica {} (replicating from {})", new_replica_id, @@ -830,6 +833,11 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) { new_buffer = Some(cx.add_model(|cx| { let mut new_buffer = Buffer::from_proto(new_replica_id, old_buffer, None, cx).unwrap(); + log::info!( + "New replica {} text: {:?}", + new_buffer.replica_id(), + new_buffer.text() + ); new_buffer.set_group_interval(Duration::from_millis(rng.gen_range(0..=200))); let network = network.clone(); cx.subscribe(&cx.handle(), move |buffer, _, event, _| { @@ -843,8 +851,33 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) { .detach(); new_buffer })); - replica_ids.push(new_replica_id); network.borrow_mut().replicate(replica_id, new_replica_id); + + if new_replica_id as usize == replica_ids.len() { + replica_ids.push(new_replica_id); + } else { + let new_buffer = new_buffer.take().unwrap(); + while network.borrow().has_unreceived(new_replica_id) { + let ops = network + .borrow_mut() + .receive(new_replica_id) + .into_iter() + .map(|op| proto::deserialize_operation(op).unwrap()); + if ops.len() > 0 { + log::info!( + "peer {} (version: {:?}) applying {} ops from the network. {:?}", + new_replica_id, + buffer.read(cx).version(), + ops.len(), + ops + ); + new_buffer.update(cx, |new_buffer, cx| { + new_buffer.apply_ops(ops, cx).unwrap(); + }); + } + } + buffers[new_replica_id as usize] = new_buffer; + } } 60..=69 if mutation_count != 0 => { buffer.update(cx, |buffer, cx| { @@ -861,9 +894,11 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) { .map(|op| proto::deserialize_operation(op).unwrap()); if ops.len() > 0 { log::info!( - "peer {} applying {} ops from the network.", + "peer {} (version: {:?}) applying {} ops from the network. {:?}", replica_id, - ops.len() + buffer.read(cx).version(), + ops.len(), + ops ); buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx).unwrap()); } @@ -886,6 +921,12 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) { let first_buffer = buffers[0].read(cx).snapshot(); for buffer in &buffers[1..] { let buffer = buffer.read(cx).snapshot(); + assert_eq!( + buffer.version(), + first_buffer.version(), + "Replica {} version != Replica 0 version", + buffer.replica_id() + ); assert_eq!( buffer.text(), first_buffer.text(), @@ -915,7 +956,12 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) { .filter(|(replica_id, _)| **replica_id != buffer.replica_id()) .map(|(replica_id, selections)| (*replica_id, selections.iter().collect::>())) .collect::>(); - assert_eq!(actual_remote_selections, expected_remote_selections); + assert_eq!( + actual_remote_selections, + expected_remote_selections, + "Replica {} remote selections != expected selections", + buffer.replica_id() + ); } } diff --git a/crates/text/src/text.rs b/crates/text/src/text.rs index 1c351079a7..ed918cd5c5 100644 --- a/crates/text/src/text.rs +++ b/crates/text/src/text.rs @@ -826,6 +826,8 @@ impl Buffer { edit.timestamp, ); self.snapshot.version.observe(edit.timestamp.local()); + self.local_clock.observe(edit.timestamp.local()); + self.lamport_clock.observe(edit.timestamp.lamport()); self.resolve_edit(edit.timestamp.local()); } } @@ -836,6 +838,7 @@ impl Buffer { if !self.version.observed(undo.id) { self.apply_undo(&undo)?; self.snapshot.version.observe(undo.id); + self.local_clock.observe(undo.id); self.lamport_clock.observe(lamport_timestamp); } } @@ -1033,8 +1036,6 @@ impl Buffer { self.snapshot.visible_text = visible_text; self.snapshot.deleted_text = deleted_text; self.snapshot.insertions.edit(new_insertions, &()); - self.local_clock.observe(timestamp.local()); - self.lamport_clock.observe(timestamp.lamport()); self.subscriptions.publish_mut(&edits); }