diff --git a/crates/language/src/buffer.rs b/crates/language/src/buffer.rs index f1bc581932..061d956444 100644 --- a/crates/language/src/buffer.rs +++ b/crates/language/src/buffer.rs @@ -73,8 +73,6 @@ pub struct Buffer { language_server: Option, completion_triggers: Vec, deferred_ops: OperationQueue, - #[cfg(test)] - pub(crate) operations: Vec, } pub struct BufferSnapshot { @@ -143,7 +141,7 @@ struct LanguageServerSnapshot { path: Arc, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub enum Operation { Buffer(text::Operation), UpdateDiagnostics { @@ -160,8 +158,9 @@ pub enum Operation { }, } -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub enum Event { + Operation(Operation), Edited, Dirtied, Saved, @@ -202,8 +201,6 @@ pub trait File { cx: &mut MutableAppContext, ) -> Task>; - fn buffer_updated(&self, buffer_id: u64, operation: Operation, cx: &mut MutableAppContext); - fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext); fn as_any(&self) -> &dyn Any; @@ -276,8 +273,6 @@ impl File for FakeFile { cx.spawn(|_| async move { Ok((Default::default(), SystemTime::UNIX_EPOCH)) }) } - fn buffer_updated(&self, _: u64, _: Operation, _: &mut MutableAppContext) {} - fn buffer_removed(&self, _: u64, _: &mut MutableAppContext) {} fn as_any(&self) -> &dyn Any { @@ -526,8 +521,6 @@ impl Buffer { language_server: None, completion_triggers: Default::default(), deferred_ops: OperationQueue::new(), - #[cfg(test)] - operations: Default::default(), } } @@ -1745,16 +1738,8 @@ impl Buffer { } } - #[cfg(not(test))] - pub fn send_operation(&mut self, operation: Operation, cx: &mut ModelContext) { - if let Some(file) = &self.file { - file.buffer_updated(self.remote_id(), operation, cx.as_mut()); - } - } - - #[cfg(test)] - pub fn send_operation(&mut self, operation: Operation, _: &mut ModelContext) { - self.operations.push(operation); + fn send_operation(&mut self, operation: Operation, cx: &mut ModelContext) { + cx.emit(Event::Operation(operation)); } pub fn remove_peer(&mut self, replica_id: ReplicaId, cx: &mut ModelContext) { diff --git a/crates/language/src/tests.rs b/crates/language/src/tests.rs index 112073aed6..5ccd400e0c 100644 --- a/crates/language/src/tests.rs +++ b/crates/language/src/tests.rs @@ -76,43 +76,48 @@ fn test_edit_events(cx: &mut gpui::MutableAppContext) { let buffer1 = cx.add_model(|cx| Buffer::new(0, "abcdef", cx)); let buffer2 = cx.add_model(|cx| Buffer::new(1, "abcdef", cx)); - let buffer_ops = buffer1.update(cx, |buffer, cx| { - let buffer_1_events = buffer_1_events.clone(); - cx.subscribe(&buffer1, move |_, _, event, _| { - buffer_1_events.borrow_mut().push(event.clone()) - }) - .detach(); - let buffer_2_events = buffer_2_events.clone(); - cx.subscribe(&buffer2, move |_, _, event, _| { - buffer_2_events.borrow_mut().push(event.clone()) - }) - .detach(); + let buffer1_ops = Rc::new(RefCell::new(Vec::new())); + buffer1.update(cx, { + let buffer1_ops = buffer1_ops.clone(); + |buffer, cx| { + let buffer_1_events = buffer_1_events.clone(); + cx.become_delegate(&buffer1, move |_, _, event, _| match event { + Event::Operation(op) => buffer1_ops.borrow_mut().push(op), + event @ _ => buffer_1_events.borrow_mut().push(event), + }) + .detach(); + let buffer_2_events = buffer_2_events.clone(); + cx.subscribe(&buffer2, move |_, _, event, _| { + buffer_2_events.borrow_mut().push(event.clone()) + }) + .detach(); - // An edit emits an edited event, followed by a dirtied event, - // since the buffer was previously in a clean state. - buffer.edit(Some(2..4), "XYZ", cx); + // An edit emits an edited event, followed by a dirtied event, + // since the buffer was previously in a clean state. + buffer.edit(Some(2..4), "XYZ", cx); - // An empty transaction does not emit any events. - buffer.start_transaction(); - buffer.end_transaction(cx); + // An empty transaction does not emit any events. + buffer.start_transaction(); + buffer.end_transaction(cx); - // A transaction containing two edits emits one edited event. - now += Duration::from_secs(1); - buffer.start_transaction_at(now); - buffer.edit(Some(5..5), "u", cx); - buffer.edit(Some(6..6), "w", cx); - buffer.end_transaction_at(now, cx); + // A transaction containing two edits emits one edited event. + now += Duration::from_secs(1); + buffer.start_transaction_at(now); + buffer.edit(Some(5..5), "u", cx); + buffer.edit(Some(6..6), "w", cx); + buffer.end_transaction_at(now, cx); - // Undoing a transaction emits one edited event. - buffer.undo(cx); - - buffer.operations.clone() + // Undoing a transaction emits one edited event. + buffer.undo(cx); + } }); // Incorporating a set of remote ops emits a single edited event, // followed by a dirtied event. buffer2.update(cx, |buffer, cx| { - buffer.apply_ops(buffer_ops, cx).unwrap(); + buffer + .apply_ops(buffer1_ops.borrow_mut().drain(..), cx) + .unwrap(); }); let buffer_1_events = buffer_1_events.borrow(); @@ -1177,17 +1182,26 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) { .collect::(); let mut replica_ids = Vec::new(); let mut buffers = Vec::new(); - let mut network = Network::new(rng.clone()); + let network = Rc::new(RefCell::new(Network::new(rng.clone()))); for i in 0..rng.gen_range(min_peers..=max_peers) { let buffer = cx.add_model(|cx| { let mut buffer = Buffer::new(i as ReplicaId, base_text.as_str(), cx); buffer.set_group_interval(Duration::from_millis(rng.gen_range(0..=200))); + let network = network.clone(); + cx.become_delegate(&cx.handle(), move |buffer, _, event, _| { + if let Event::Operation(op) = event { + network + .borrow_mut() + .broadcast(buffer.replica_id(), vec![proto::serialize_operation(&op)]); + } + }) + .detach(); buffer }); buffers.push(buffer); replica_ids.push(i as ReplicaId); - network.add_peer(i as ReplicaId); + network.borrow_mut().add_peer(i as ReplicaId); log::info!("Adding initial peer with replica id {}", i); } @@ -1268,10 +1282,20 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) { let mut new_buffer = Buffer::from_proto(new_replica_id, old_buffer, None, cx).unwrap(); new_buffer.set_group_interval(Duration::from_millis(rng.gen_range(0..=200))); + let network = network.clone(); + cx.become_delegate(&cx.handle(), move |buffer, _, event, _| { + if let Event::Operation(op) = event { + network.borrow_mut().broadcast( + buffer.replica_id(), + vec![proto::serialize_operation(&op)], + ); + } + }) + .detach(); new_buffer })); replica_ids.push(new_replica_id); - network.replicate(replica_id, new_replica_id); + network.borrow_mut().replicate(replica_id, new_replica_id); } 60..=69 if mutation_count != 0 => { buffer.update(cx, |buffer, cx| { @@ -1280,8 +1304,9 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) { }); mutation_count -= 1; } - _ if network.has_unreceived(replica_id) => { + _ if network.borrow().has_unreceived(replica_id) => { let ops = network + .borrow_mut() .receive(replica_id) .into_iter() .map(|op| proto::deserialize_operation(op).unwrap()); @@ -1297,14 +1322,6 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) { _ => {} } - buffer.update(cx, |buffer, _| { - let ops = buffer - .operations - .drain(..) - .map(|op| proto::serialize_operation(&op)) - .collect(); - network.broadcast(buffer.replica_id(), ops); - }); now += Duration::from_millis(rng.gen_range(0..=200)); buffers.extend(new_buffer); @@ -1312,7 +1329,7 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) { buffer.read(cx).check_invariants(); } - if mutation_count == 0 && network.is_idle() { + if mutation_count == 0 && network.borrow().is_idle() { break; } } diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 694755274d..f517560d95 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -17,8 +17,8 @@ use gpui::{ use language::{ proto::{deserialize_anchor, deserialize_version, serialize_anchor, serialize_version}, range_from_lsp, Anchor, AnchorRangeExt, Bias, Buffer, CodeAction, CodeLabel, Completion, - Diagnostic, DiagnosticEntry, File as _, Language, LanguageRegistry, Operation, PointUtf16, - ToLspPosition, ToOffset, ToPointUtf16, Transaction, + Diagnostic, DiagnosticEntry, Event as BufferEvent, File as _, Language, LanguageRegistry, + Operation, PointUtf16, ToLspPosition, ToOffset, ToPointUtf16, Transaction, }; use lsp::{DiagnosticSeverity, DocumentHighlightKind, LanguageServer}; use lsp_command::*; @@ -945,10 +945,37 @@ impl Project { remote_id ))?, } - self.assign_language_to_buffer(&buffer, worktree, cx); + cx.become_delegate(buffer, Self::on_buffer_event).detach(); + self.assign_language_to_buffer(buffer, worktree, cx); + Ok(()) } + fn on_buffer_event( + &mut self, + buffer: ModelHandle, + event: BufferEvent, + cx: &mut ModelContext, + ) { + match event { + BufferEvent::Operation(operation) => { + if let Some(project_id) = self.remote_id() { + let request = self.client.request(proto::UpdateBuffer { + project_id, + buffer_id: buffer.read(cx).remote_id(), + operations: vec![language::proto::serialize_operation(&operation)], + }); + cx.foreground() + .spawn(async move { + request.await.log_err(); + }) + .detach(); + } + } + _ => {} + } + } + fn assign_language_to_buffer( &mut self, buffer: &ModelHandle, @@ -4452,7 +4479,10 @@ mod tests { buffer1.update(cx, |buffer, cx| { cx.subscribe(&buffer1, { let events = events.clone(); - move |_, _, event, _| events.borrow_mut().push(event.clone()) + move |_, _, event, _| match event { + BufferEvent::Operation(_) => {} + _ => events.borrow_mut().push(event.clone()), + } }) .detach(); diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 78dac23681..8761c7b80e 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -19,7 +19,7 @@ use gpui::{ }; use language::{ proto::{deserialize_version, serialize_version}, - Buffer, DiagnosticEntry, Operation, PointUtf16, Rope, + Buffer, DiagnosticEntry, PointUtf16, Rope, }; use lazy_static::lazy_static; use parking_lot::Mutex; @@ -71,7 +71,6 @@ pub struct LocalWorktree { share: Option, diagnostics: HashMap, Vec>>, diagnostic_summaries: TreeMap, - queued_operations: Vec<(u64, Operation)>, client: Arc, fs: Arc, visible: bool, @@ -84,7 +83,6 @@ pub struct RemoteWorktree { client: Arc, updates_tx: UnboundedSender, replica_id: ReplicaId, - queued_operations: Vec<(u64, Operation)>, diagnostic_summaries: TreeMap, visible: bool, } @@ -226,7 +224,6 @@ impl Worktree { snapshot_rx: snapshot_rx.clone(), updates_tx, client: client.clone(), - queued_operations: Default::default(), diagnostic_summaries: TreeMap::from_ordered_entries( worktree.diagnostic_summaries.into_iter().map(|summary| { ( @@ -420,42 +417,6 @@ impl Worktree { cx.notify(); } - - fn send_buffer_update( - &mut self, - buffer_id: u64, - operation: Operation, - cx: &mut ModelContext, - ) { - if let Some((project_id, rpc)) = match self { - Worktree::Local(worktree) => worktree - .share - .as_ref() - .map(|share| (share.project_id, worktree.client.clone())), - Worktree::Remote(worktree) => Some((worktree.project_id, worktree.client.clone())), - } { - cx.spawn(|worktree, mut cx| async move { - if let Err(error) = rpc - .request(proto::UpdateBuffer { - project_id, - buffer_id, - operations: vec![language::proto::serialize_operation(&operation)], - }) - .await - { - worktree.update(&mut cx, |worktree, _| { - log::error!("error sending buffer operation: {}", error); - match worktree { - Worktree::Local(t) => &mut t.queued_operations, - Worktree::Remote(t) => &mut t.queued_operations, - } - .push((buffer_id, operation)); - }); - } - }) - .detach(); - } - } } impl LocalWorktree { @@ -526,7 +487,6 @@ impl LocalWorktree { poll_task: None, diagnostics: Default::default(), diagnostic_summaries: Default::default(), - queued_operations: Default::default(), client, fs, visible, @@ -1455,12 +1415,6 @@ impl language::File for File { }) } - fn buffer_updated(&self, buffer_id: u64, operation: Operation, cx: &mut MutableAppContext) { - self.worktree.update(cx, |worktree, cx| { - worktree.send_buffer_update(buffer_id, operation, cx); - }); - } - fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext) { self.worktree.update(cx, |worktree, _| { if let Worktree::Remote(worktree) = worktree { diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index d05c9e2401..7f1808fb08 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -1034,6 +1034,7 @@ mod tests { use project::{ fs::{FakeFs, Fs as _}, search::SearchQuery, + worktree::WorktreeHandle, DiagnosticSummary, Project, ProjectPath, }; use rand::prelude::*; @@ -1411,6 +1412,8 @@ mod tests { buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty())); buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await; + worktree_a.flush_fs_events(cx_a).await; + // Make changes on host's file system, see those changes on guest worktrees. fs.rename( "/a/file1".as_ref(),