diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 4164424a2b..dab7df3e67 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -187,6 +187,7 @@ impl Server { .add_request_handler(Server::forward_project_request::) .add_request_handler(Server::forward_project_request::) .add_request_handler(Server::forward_project_request::) + .add_message_handler(Server::create_buffer_for_peer) .add_request_handler(Server::update_buffer) .add_message_handler(Server::update_buffer_file) .add_message_handler(Server::buffer_reloaded) @@ -1186,6 +1187,18 @@ impl Server { Ok(()) } + async fn create_buffer_for_peer( + self: Arc, + request: TypedEnvelope, + ) -> Result<()> { + self.peer.forward_send( + request.sender_id, + ConnectionId(request.payload.peer_id), + request.payload, + )?; + Ok(()) + } + async fn update_buffer( self: Arc, request: TypedEnvelope, diff --git a/crates/language/src/buffer.rs b/crates/language/src/buffer.rs index c8180e0a8f..7c616762d8 100644 --- a/crates/language/src/buffer.rs +++ b/crates/language/src/buffer.rs @@ -358,7 +358,7 @@ impl Buffer { pub fn from_proto( replica_id: ReplicaId, - message: proto::BufferState, + message: proto::Buffer, file: Option>, cx: &mut ModelContext, ) -> Result { @@ -406,7 +406,7 @@ impl Buffer { Ok(this) } - pub fn to_proto(&self) -> proto::BufferState { + pub fn to_proto(&self) -> proto::Buffer { let mut operations = self .text .history() @@ -414,7 +414,7 @@ impl Buffer { .chain(self.deferred_ops.iter().map(proto::serialize_operation)) .collect::>(); operations.sort_unstable_by_key(proto::lamport_timestamp_for_operation); - proto::BufferState { + proto::Buffer { id: self.remote_id(), file: self.file.as_ref().map(|f| f.to_proto()), base_text: self.base_text().to_string(), diff --git a/crates/language/src/proto.rs b/crates/language/src/proto.rs index df1e0e0959..c01973b1c6 100644 --- a/crates/language/src/proto.rs +++ b/crates/language/src/proto.rs @@ -9,7 +9,7 @@ use rpc::proto; use std::{ops::Range, sync::Arc}; use text::*; -pub use proto::{Buffer, BufferState, LineEnding, SelectionSet}; +pub use proto::{Buffer, LineEnding, SelectionSet}; pub fn deserialize_line_ending(message: proto::LineEnding) -> text::LineEnding { match message { diff --git a/crates/project/src/lsp_command.rs b/crates/project/src/lsp_command.rs index 7737a4df9e..37f6e76340 100644 --- a/crates/project/src/lsp_command.rs +++ b/crates/project/src/lsp_command.rs @@ -522,11 +522,10 @@ async fn location_links_from_proto( for link in proto_links { let origin = match link.origin { Some(origin) => { - let buffer = origin - .buffer - .ok_or_else(|| anyhow!("missing origin buffer"))?; let buffer = project - .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) + .update(&mut cx, |this, cx| { + this.wait_for_buffer(origin.buffer_id, cx) + }) .await?; let start = origin .start @@ -548,9 +547,10 @@ async fn location_links_from_proto( }; let target = link.target.ok_or_else(|| anyhow!("missing target"))?; - let buffer = target.buffer.ok_or_else(|| anyhow!("missing buffer"))?; let buffer = project - .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) + .update(&mut cx, |this, cx| { + this.wait_for_buffer(target.buffer_id, cx) + }) .await?; let start = target .start @@ -664,19 +664,19 @@ fn location_links_to_proto( .into_iter() .map(|definition| { let origin = definition.origin.map(|origin| { - let buffer = project.serialize_buffer_for_peer(&origin.buffer, peer_id, cx); + let buffer_id = project.create_buffer_for_peer(&origin.buffer, peer_id, cx); proto::Location { start: Some(serialize_anchor(&origin.range.start)), end: Some(serialize_anchor(&origin.range.end)), - buffer: Some(buffer), + buffer_id, } }); - let buffer = project.serialize_buffer_for_peer(&definition.target.buffer, peer_id, cx); + let buffer_id = project.create_buffer_for_peer(&definition.target.buffer, peer_id, cx); let target = proto::Location { start: Some(serialize_anchor(&definition.target.range.start)), end: Some(serialize_anchor(&definition.target.range.end)), - buffer: Some(buffer), + buffer_id, }; proto::LocationLink { @@ -792,11 +792,11 @@ impl LspCommand for GetReferences { let locations = response .into_iter() .map(|definition| { - let buffer = project.serialize_buffer_for_peer(&definition.buffer, peer_id, cx); + let buffer_id = project.create_buffer_for_peer(&definition.buffer, peer_id, cx); proto::Location { start: Some(serialize_anchor(&definition.range.start)), end: Some(serialize_anchor(&definition.range.end)), - buffer: Some(buffer), + buffer_id, } }) .collect(); @@ -812,9 +812,10 @@ impl LspCommand for GetReferences { ) -> Result> { let mut locations = Vec::new(); for location in message.locations { - let buffer = location.buffer.ok_or_else(|| anyhow!("missing buffer"))?; let target_buffer = project - .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) + .update(&mut cx, |this, cx| { + this.wait_for_buffer(location.buffer_id, cx) + }) .await?; let start = location .start diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 31d18f4fa8..0f762f822f 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -112,7 +112,7 @@ pub struct Project { collaborators: HashMap, client_subscriptions: Vec, _subscriptions: Vec, - opened_buffer: (Rc>>, watch::Receiver<()>), + opened_buffer: (watch::Sender<()>, watch::Receiver<()>), shared_buffers: HashMap>, #[allow(clippy::type_complexity)] loading_buffers: HashMap< @@ -375,6 +375,7 @@ impl Project { client.add_model_message_handler(Self::handle_update_project); client.add_model_message_handler(Self::handle_unregister_project); client.add_model_message_handler(Self::handle_project_unshared); + client.add_model_message_handler(Self::handle_create_buffer_for_peer); client.add_model_message_handler(Self::handle_update_buffer_file); client.add_model_message_handler(Self::handle_update_buffer); client.add_model_message_handler(Self::handle_update_diagnostic_summary); @@ -454,7 +455,6 @@ impl Project { let handle = cx.weak_handle(); project_store.update(cx, |store, cx| store.add_project(handle, cx)); - let (opened_buffer_tx, opened_buffer_rx) = watch::channel(); Self { worktrees: Default::default(), collaborators: Default::default(), @@ -472,7 +472,7 @@ impl Project { _maintain_remote_id, _maintain_online_status, }, - opened_buffer: (Rc::new(RefCell::new(opened_buffer_tx)), opened_buffer_rx), + opened_buffer: watch::channel(), client_subscriptions: Vec::new(), _subscriptions: vec![cx.observe_global::(Self::on_settings_changed)], _maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx), @@ -540,7 +540,6 @@ impl Project { worktrees.push(worktree); } - let (opened_buffer_tx, opened_buffer_rx) = watch::channel(); let this = cx.add_model(|cx: &mut ModelContext| { let handle = cx.weak_handle(); project_store.update(cx, |store, cx| store.add_project(handle, cx)); @@ -548,7 +547,7 @@ impl Project { let mut this = Self { worktrees: Vec::new(), loading_buffers: Default::default(), - opened_buffer: (Rc::new(RefCell::new(opened_buffer_tx)), opened_buffer_rx), + opened_buffer: watch::channel(), shared_buffers: Default::default(), loading_local_worktrees: Default::default(), active_entry: None, @@ -1624,9 +1623,10 @@ impl Project { path: path_string, }) .await?; - let buffer = response.buffer.ok_or_else(|| anyhow!("missing buffer"))?; - this.update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) - .await + this.update(&mut cx, |this, cx| { + this.wait_for_buffer(response.buffer_id, cx) + }) + .await }) } @@ -1684,11 +1684,8 @@ impl Project { .client .request(proto::OpenBufferById { project_id, id }); cx.spawn(|this, mut cx| async move { - let buffer = request - .await? - .buffer - .ok_or_else(|| anyhow!("invalid buffer"))?; - this.update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) + let buffer_id = request.await?.buffer_id; + this.update(&mut cx, |this, cx| this.wait_for_buffer(buffer_id, cx)) .await }) } else { @@ -1800,6 +1797,7 @@ impl Project { }) .detach(); + *self.opened_buffer.0.borrow_mut() = (); Ok(()) } @@ -3476,9 +3474,10 @@ impl Project { }); cx.spawn(|this, mut cx| async move { let response = request.await?; - let buffer = response.buffer.ok_or_else(|| anyhow!("invalid buffer"))?; - this.update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) - .await + this.update(&mut cx, |this, cx| { + this.wait_for_buffer(response.buffer_id, cx) + }) + .await }) } else { Task::ready(Err(anyhow!("project does not have a remote id"))) @@ -4294,9 +4293,10 @@ impl Project { let response = request.await?; let mut result = HashMap::default(); for location in response.locations { - let buffer = location.buffer.ok_or_else(|| anyhow!("missing buffer"))?; let target_buffer = this - .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) + .update(&mut cx, |this, cx| { + this.wait_for_buffer(location.buffer_id, cx) + }) .await?; let start = location .start @@ -5107,6 +5107,36 @@ impl Project { }) } + async fn handle_create_buffer_for_peer( + this: ModelHandle, + envelope: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + this.update(&mut cx, |this, cx| { + let mut buffer = envelope + .payload + .buffer + .ok_or_else(|| anyhow!("invalid buffer"))?; + let mut buffer_file = None; + if let Some(file) = buffer.file.take() { + let worktree_id = WorktreeId::from_proto(file.worktree_id); + let worktree = this + .worktree_for_id(worktree_id, cx) + .ok_or_else(|| anyhow!("no worktree found for id {}", file.worktree_id))?; + buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?) + as Arc); + } + + let buffer = cx.add_model(|cx| { + Buffer::from_proto(this.replica_id(), buffer, buffer_file, cx).unwrap() + }); + this.register_buffer(&buffer, cx)?; + + Ok(()) + }) + } + async fn handle_update_buffer_file( this: ModelHandle, envelope: TypedEnvelope, @@ -5448,9 +5478,9 @@ impl Project { for range in ranges { let start = serialize_anchor(&range.start); let end = serialize_anchor(&range.end); - let buffer = this.serialize_buffer_for_peer(&buffer, peer_id, cx); + let buffer_id = this.create_buffer_for_peer(&buffer, peer_id, cx); locations.push(proto::Location { - buffer: Some(buffer), + buffer_id, start: Some(start), end: Some(end), }); @@ -5487,9 +5517,9 @@ impl Project { .await?; Ok(proto::OpenBufferForSymbolResponse { - buffer: Some(this.update(&mut cx, |this, cx| { - this.serialize_buffer_for_peer(&buffer, peer_id, cx) - })), + buffer_id: this.update(&mut cx, |this, cx| { + this.create_buffer_for_peer(&buffer, peer_id, cx) + }), }) } @@ -5515,7 +5545,7 @@ impl Project { .await?; this.update(&mut cx, |this, cx| { Ok(proto::OpenBufferResponse { - buffer: Some(this.serialize_buffer_for_peer(&buffer, peer_id, cx)), + buffer_id: this.create_buffer_for_peer(&buffer, peer_id, cx), }) }) } @@ -5541,7 +5571,7 @@ impl Project { let buffer = open_buffer.await?; this.update(&mut cx, |this, cx| { Ok(proto::OpenBufferResponse { - buffer: Some(this.serialize_buffer_for_peer(&buffer, peer_id, cx)), + buffer_id: this.create_buffer_for_peer(&buffer, peer_id, cx), }) }) } @@ -5553,13 +5583,13 @@ impl Project { cx: &AppContext, ) -> proto::ProjectTransaction { let mut serialized_transaction = proto::ProjectTransaction { - buffers: Default::default(), + buffer_ids: Default::default(), transactions: Default::default(), }; for (buffer, transaction) in project_transaction.0 { serialized_transaction - .buffers - .push(self.serialize_buffer_for_peer(&buffer, peer_id, cx)); + .buffer_ids + .push(self.create_buffer_for_peer(&buffer, peer_id, cx)); serialized_transaction .transactions .push(language::proto::serialize_transaction(&transaction)); @@ -5575,9 +5605,10 @@ impl Project { ) -> Task> { cx.spawn(|this, mut cx| async move { let mut project_transaction = ProjectTransaction::default(); - for (buffer, transaction) in message.buffers.into_iter().zip(message.transactions) { + for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions) + { let buffer = this - .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) + .update(&mut cx, |this, cx| this.wait_for_buffer(buffer_id, cx)) .await?; let transaction = language::proto::deserialize_transaction(transaction)?; project_transaction.0.insert(buffer, transaction); @@ -5601,81 +5632,51 @@ impl Project { }) } - fn serialize_buffer_for_peer( + fn create_buffer_for_peer( &mut self, buffer: &ModelHandle, peer_id: PeerId, cx: &AppContext, - ) -> proto::Buffer { + ) -> u64 { let buffer_id = buffer.read(cx).remote_id(); - let shared_buffers = self.shared_buffers.entry(peer_id).or_default(); - if shared_buffers.insert(buffer_id) { - proto::Buffer { - variant: Some(proto::buffer::Variant::State(buffer.read(cx).to_proto())), - } - } else { - proto::Buffer { - variant: Some(proto::buffer::Variant::Id(buffer_id)), + if let Some(project_id) = self.remote_id() { + let shared_buffers = self.shared_buffers.entry(peer_id).or_default(); + if shared_buffers.insert(buffer_id) { + self.client + .send(proto::CreateBufferForPeer { + project_id, + peer_id: peer_id.0, + buffer: Some(buffer.read(cx).to_proto()), + }) + .log_err(); } } + + buffer_id } - fn deserialize_buffer( - &mut self, - buffer: proto::Buffer, + fn wait_for_buffer( + &self, + id: u64, cx: &mut ModelContext, ) -> Task>> { - let replica_id = self.replica_id(); - - let opened_buffer_tx = self.opened_buffer.0.clone(); let mut opened_buffer_rx = self.opened_buffer.1.clone(); - cx.spawn(|this, mut cx| async move { - match buffer.variant.ok_or_else(|| anyhow!("missing buffer"))? { - proto::buffer::Variant::Id(id) => { - let buffer = loop { - let buffer = this.read_with(&cx, |this, cx| { - this.opened_buffers - .get(&id) - .and_then(|buffer| buffer.upgrade(cx)) - }); - if let Some(buffer) = buffer { - break buffer; - } - opened_buffer_rx - .next() - .await - .ok_or_else(|| anyhow!("project dropped while waiting for buffer"))?; - }; - Ok(buffer) + cx.spawn(|this, cx| async move { + let buffer = loop { + let buffer = this.read_with(&cx, |this, cx| { + this.opened_buffers + .get(&id) + .and_then(|buffer| buffer.upgrade(cx)) + }); + if let Some(buffer) = buffer { + break buffer; } - proto::buffer::Variant::State(mut buffer) => { - let mut buffer_worktree = None; - let mut buffer_file = None; - if let Some(file) = buffer.file.take() { - this.read_with(&cx, |this, cx| { - let worktree_id = WorktreeId::from_proto(file.worktree_id); - let worktree = - this.worktree_for_id(worktree_id, cx).ok_or_else(|| { - anyhow!("no worktree found for id {}", file.worktree_id) - })?; - buffer_file = - Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?) - as Arc); - buffer_worktree = Some(worktree); - Ok::<_, anyhow::Error>(()) - })?; - } - - let buffer = cx.add_model(|cx| { - Buffer::from_proto(replica_id, buffer, buffer_file, cx).unwrap() - }); - - this.update(&mut cx, |this, cx| this.register_buffer(&buffer, cx))?; - - *opened_buffer_tx.borrow_mut().borrow_mut() = (); - Ok(buffer) - } - } + opened_buffer_rx + .next() + .await + .ok_or_else(|| anyhow!("project dropped while waiting for buffer"))?; + }; + Ok(buffer) }) } diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 3da7f62ac7..c803ff85b5 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -55,58 +55,59 @@ message Envelope { OpenBufferById open_buffer_by_id = 44; OpenBufferByPath open_buffer_by_path = 45; OpenBufferResponse open_buffer_response = 46; - UpdateBuffer update_buffer = 47; - UpdateBufferFile update_buffer_file = 48; - SaveBuffer save_buffer = 49; - BufferSaved buffer_saved = 50; - BufferReloaded buffer_reloaded = 51; - ReloadBuffers reload_buffers = 52; - ReloadBuffersResponse reload_buffers_response = 53; - FormatBuffers format_buffers = 54; - FormatBuffersResponse format_buffers_response = 55; - GetCompletions get_completions = 56; - GetCompletionsResponse get_completions_response = 57; - ApplyCompletionAdditionalEdits apply_completion_additional_edits = 58; - ApplyCompletionAdditionalEditsResponse apply_completion_additional_edits_response = 59; - GetCodeActions get_code_actions = 60; - GetCodeActionsResponse get_code_actions_response = 61; - GetHover get_hover = 62; - GetHoverResponse get_hover_response = 63; - ApplyCodeAction apply_code_action = 64; - ApplyCodeActionResponse apply_code_action_response = 65; - PrepareRename prepare_rename = 66; - PrepareRenameResponse prepare_rename_response = 67; - PerformRename perform_rename = 68; - PerformRenameResponse perform_rename_response = 69; - SearchProject search_project = 70; - SearchProjectResponse search_project_response = 71; + CreateBufferForPeer create_buffer_for_peer = 47; + UpdateBuffer update_buffer = 48; + UpdateBufferFile update_buffer_file = 49; + SaveBuffer save_buffer = 50; + BufferSaved buffer_saved = 51; + BufferReloaded buffer_reloaded = 52; + ReloadBuffers reload_buffers = 53; + ReloadBuffersResponse reload_buffers_response = 54; + FormatBuffers format_buffers = 55; + FormatBuffersResponse format_buffers_response = 56; + GetCompletions get_completions = 57; + GetCompletionsResponse get_completions_response = 58; + ApplyCompletionAdditionalEdits apply_completion_additional_edits = 59; + ApplyCompletionAdditionalEditsResponse apply_completion_additional_edits_response = 60; + GetCodeActions get_code_actions = 61; + GetCodeActionsResponse get_code_actions_response = 62; + GetHover get_hover = 63; + GetHoverResponse get_hover_response = 64; + ApplyCodeAction apply_code_action = 65; + ApplyCodeActionResponse apply_code_action_response = 66; + PrepareRename prepare_rename = 67; + PrepareRenameResponse prepare_rename_response = 68; + PerformRename perform_rename = 69; + PerformRenameResponse perform_rename_response = 70; + SearchProject search_project = 71; + SearchProjectResponse search_project_response = 72; - GetChannels get_channels = 72; - GetChannelsResponse get_channels_response = 73; - JoinChannel join_channel = 74; - JoinChannelResponse join_channel_response = 75; - LeaveChannel leave_channel = 76; - SendChannelMessage send_channel_message = 77; - SendChannelMessageResponse send_channel_message_response = 78; - ChannelMessageSent channel_message_sent = 79; - GetChannelMessages get_channel_messages = 80; - GetChannelMessagesResponse get_channel_messages_response = 81; + GetChannels get_channels = 73; + GetChannelsResponse get_channels_response = 74; + JoinChannel join_channel = 75; + JoinChannelResponse join_channel_response = 76; + LeaveChannel leave_channel = 77; + SendChannelMessage send_channel_message = 78; + SendChannelMessageResponse send_channel_message_response = 79; + ChannelMessageSent channel_message_sent = 80; + GetChannelMessages get_channel_messages = 81; + GetChannelMessagesResponse get_channel_messages_response = 82; - UpdateContacts update_contacts = 82; - UpdateInviteInfo update_invite_info = 83; - ShowContacts show_contacts = 84; + UpdateContacts update_contacts = 83; + UpdateInviteInfo update_invite_info = 84; + ShowContacts show_contacts = 85; - GetUsers get_users = 85; - FuzzySearchUsers fuzzy_search_users = 86; - UsersResponse users_response = 87; - RequestContact request_contact = 88; - RespondToContactRequest respond_to_contact_request = 89; - RemoveContact remove_contact = 90; + GetUsers get_users = 86; + FuzzySearchUsers fuzzy_search_users = 87; + UsersResponse users_response = 88; + RequestContact request_contact = 89; + RespondToContactRequest respond_to_contact_request = 90; + RemoveContact remove_contact = 91; - Follow follow = 91; - FollowResponse follow_response = 92; - UpdateFollowers update_followers = 93; - Unfollow unfollow = 94; + Follow follow = 92; + FollowResponse follow_response = 93; + UpdateFollowers update_followers = 94; + Unfollow unfollow = 95; } } @@ -299,7 +300,7 @@ message GetDocumentHighlightsResponse { } message Location { - Buffer buffer = 1; + uint64 buffer_id = 1; Anchor start = 2; Anchor end = 3; } @@ -348,7 +349,7 @@ message OpenBufferForSymbol { } message OpenBufferForSymbolResponse { - Buffer buffer = 1; + uint64 buffer_id = 1; } message OpenBufferByPath { @@ -363,12 +364,13 @@ message OpenBufferById { } message OpenBufferResponse { - Buffer buffer = 1; + uint64 buffer_id = 1; } -message CloseBuffer { +message CreateBufferForPeer { uint64 project_id = 1; - uint64 buffer_id = 2; + uint32 peer_id = 2; + Buffer buffer = 3; } message UpdateBuffer { @@ -539,7 +541,7 @@ message CodeAction { } message ProjectTransaction { - repeated Buffer buffers = 1; + repeated uint64 buffer_ids = 1; repeated Transaction transactions = 2; } @@ -807,13 +809,6 @@ message Entry { } message Buffer { - oneof variant { - uint64 id = 1; - BufferState state = 2; - } -} - -message BufferState { uint64 id = 1; optional File file = 2; string base_text = 3; diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 8cd65d34f1..2ba3fa18ba 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -86,6 +86,7 @@ messages!( (RemoveContact, Foreground), (ChannelMessageSent, Foreground), (CopyProjectEntry, Foreground), + (CreateBufferForPeer, Foreground), (CreateProjectEntry, Foreground), (DeleteProjectEntry, Foreground), (Error, Foreground), @@ -222,6 +223,7 @@ entity_messages!( BufferReloaded, BufferSaved, CopyProjectEntry, + CreateBufferForPeer, CreateProjectEntry, DeleteProjectEntry, Follow,