diff --git a/zed-rpc/proto/zed.proto b/zed-rpc/proto/zed.proto index 20fab30a47..a21ee3e464 100644 --- a/zed-rpc/proto/zed.proto +++ b/zed-rpc/proto/zed.proto @@ -16,7 +16,8 @@ message Envelope { OpenBufferResponse open_buffer_response = 11; CloseBuffer close_buffer = 12; UpdateBuffer update_buffer = 13; - RemoveGuest remove_guest = 14; + AddGuest add_guest = 14; + RemoveGuest remove_guest = 15; } } @@ -45,19 +46,20 @@ message OpenWorktree { message OpenWorktreeResponse { Worktree worktree = 1; - optional uint32 replica_id = 2; + uint32 replica_id = 2; + uint32 host_peer_id = 3; } message AddGuest { uint64 worktree_id = 1; - uint32 replica_id = 2; - User user = 3; + uint32 peer_id = 2; + uint32 replica_id = 3; + User user = 4; } message RemoveGuest { uint64 worktree_id = 1; uint32 peer_id = 2; - uint32 replica_id = 3; } message OpenBuffer { diff --git a/zed-rpc/src/proto.rs b/zed-rpc/src/proto.rs index abcceaa5d4..40f530d550 100644 --- a/zed-rpc/src/proto.rs +++ b/zed-rpc/src/proto.rs @@ -74,6 +74,7 @@ request_message!(OpenWorktree, OpenWorktreeResponse); request_message!(OpenBuffer, OpenBufferResponse); message!(CloseBuffer); message!(UpdateBuffer); +message!(AddGuest); message!(RemoveGuest); /// A stream of protobuf messages. diff --git a/zed/src/editor/buffer.rs b/zed/src/editor/buffer.rs index be4d8c0110..dadafa0fc6 100644 --- a/zed/src/editor/buffer.rs +++ b/zed/src/editor/buffer.rs @@ -1494,7 +1494,7 @@ impl Buffer { self.operations.push(operation); } - pub fn peer_left(&mut self, replica_id: ReplicaId, cx: &mut ModelContext) { + pub fn remove_guest(&mut self, replica_id: ReplicaId, cx: &mut ModelContext) { self.selections .retain(|set_id, _| set_id.replica_id != replica_id); cx.notify(); diff --git a/zed/src/worktree.rs b/zed/src/worktree.rs index cd13a83da3..2622c24b06 100644 --- a/zed/src/worktree.rs +++ b/zed/src/worktree.rs @@ -37,6 +37,7 @@ use std::{ fmt, fs, future::Future, io, + iter::FromIterator, ops::Deref, os::unix::fs::MetadataExt, path::{Path, PathBuf}, @@ -53,6 +54,7 @@ lazy_static! { } pub fn init(cx: &mut MutableAppContext, rpc: rpc::Client) { + rpc.on_message(remote::add_guest, cx); rpc.on_message(remote::remove_guest, cx); rpc.on_message(remote::open_buffer, cx); rpc.on_message(remote::close_buffer, cx); @@ -100,16 +102,16 @@ impl Worktree { let worktree_message = open_worktree_response .worktree .ok_or_else(|| anyhow!("empty worktree"))?; - let replica_id = open_worktree_response - .replica_id - .ok_or_else(|| anyhow!("empty replica id"))?; + let replica_id = open_worktree_response.replica_id; + let host_peer_id = PeerId(open_worktree_response.host_peer_id); let worktree = cx.update(|cx| { cx.add_model(|cx| { Worktree::Remote(RemoteWorktree::new( id, + replica_id as ReplicaId, + host_peer_id, worktree_message, rpc.clone(), - replica_id as ReplicaId, languages, cx, )) @@ -154,6 +156,17 @@ impl Worktree { } } + pub fn add_guest( + &mut self, + envelope: TypedEnvelope, + cx: &mut ModelContext, + ) -> Result<()> { + match self { + Worktree::Local(worktree) => worktree.add_guest(envelope, cx), + Worktree::Remote(worktree) => worktree.add_guest(envelope, cx), + } + } + pub fn remove_guest( &mut self, envelope: TypedEnvelope, @@ -161,7 +174,7 @@ impl Worktree { ) -> Result<()> { match self { Worktree::Local(worktree) => worktree.remove_guest(envelope, cx), - Worktree::Remote(_) => todo!(), + Worktree::Remote(worktree) => worktree.remove_guest(envelope, cx), } } @@ -261,6 +274,7 @@ pub struct LocalWorktree { rpc: Option<(rpc::Client, u64)>, open_buffers: HashMap>, shared_buffers: HashMap>>, + peers: HashMap, languages: Arc, } @@ -298,6 +312,7 @@ impl LocalWorktree { poll_scheduled: false, open_buffers: Default::default(), shared_buffers: Default::default(), + peers: Default::default(), rpc: None, languages, }; @@ -422,12 +437,34 @@ impl LocalWorktree { Ok(()) } + pub fn add_guest( + &mut self, + envelope: TypedEnvelope, + cx: &mut ModelContext, + ) -> Result<()> { + self.peers.insert( + PeerId(envelope.payload.peer_id), + envelope.payload.replica_id as ReplicaId, + ); + Ok(()) + } + pub fn remove_guest( &mut self, envelope: TypedEnvelope, cx: &mut ModelContext, ) -> Result<()> { - self.shared_buffers.remove(&envelope.original_sender_id()?); + let peer_id = PeerId(envelope.payload.peer_id); + let replica_id = self + .peers + .remove(&peer_id) + .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?; + self.shared_buffers.remove(&peer_id); + for (_, buffer) in &self.open_buffers { + if let Some(buffer) = buffer.upgrade(&cx) { + buffer.update(cx, |buffer, cx| buffer.remove_guest(replica_id, cx)); + } + } Ok(()) } @@ -715,15 +752,17 @@ pub struct RemoteWorktree { rpc: rpc::Client, replica_id: ReplicaId, open_buffers: HashMap>, + peers: HashMap, languages: Arc, } impl RemoteWorktree { fn new( remote_id: u64, + replica_id: ReplicaId, + host_peer_id: PeerId, worktree: proto::Worktree, rpc: rpc::Client, - replica_id: ReplicaId, languages: Arc, cx: &mut ModelContext, ) -> Self { @@ -779,6 +818,7 @@ impl RemoteWorktree { rpc, replica_id, open_buffers: Default::default(), + peers: HashMap::from_iter(Some((host_peer_id, 0))), languages, } } @@ -837,6 +877,36 @@ impl RemoteWorktree { } }) } + + pub fn add_guest( + &mut self, + envelope: TypedEnvelope, + cx: &mut ModelContext, + ) -> Result<()> { + self.peers.insert( + PeerId(envelope.payload.peer_id), + envelope.payload.replica_id as ReplicaId, + ); + Ok(()) + } + + pub fn remove_guest( + &mut self, + envelope: TypedEnvelope, + cx: &mut ModelContext, + ) -> Result<()> { + let peer_id = PeerId(envelope.payload.peer_id); + let replica_id = self + .peers + .remove(&peer_id) + .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?; + for (_, buffer) in &self.open_buffers { + if let Some(buffer) = buffer.upgrade(&cx) { + buffer.update(cx, |buffer, cx| buffer.remove_guest(replica_id, cx)); + } + } + Ok(()) + } } #[derive(Clone)] @@ -1938,6 +2008,18 @@ impl<'a> Iterator for ChildEntriesIter<'a> { mod remote { use super::*; + pub async fn add_guest( + envelope: TypedEnvelope, + rpc: &rpc::Client, + cx: &mut AsyncAppContext, + ) -> anyhow::Result<()> { + rpc.state + .lock() + .await + .shared_worktree(envelope.payload.worktree_id, cx)? + .update(cx, |worktree, cx| worktree.add_guest(envelope, cx)) + } + pub async fn remove_guest( envelope: TypedEnvelope, rpc: &rpc::Client,