From 1aec691b35f8208cba6dad8aadc3a13a04980fb8 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 20 Dec 2022 12:03:43 +0100 Subject: [PATCH] Sketch out project reconnection routine on the server --- crates/collab/src/db.rs | 111 ++++++++++++++++++---- crates/collab/src/rpc.rs | 190 ++++++++++++++++++++++++++++++------- crates/rpc/proto/zed.proto | 14 ++- crates/rpc/src/proto.rs | 2 + 4 files changed, 257 insertions(+), 60 deletions(-) diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 6e9c365f0f..6679922855 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -1319,15 +1319,7 @@ impl Database { Condition::all() .add(room_participant::Column::RoomId.eq(room_id)) .add(room_participant::Column::UserId.eq(user_id)) - .add( - Condition::any() - .add(room_participant::Column::AnsweringConnectionId.is_null()) - .add(room_participant::Column::AnsweringConnectionLost.eq(true)) - .add( - room_participant::Column::AnsweringConnectionServerId - .ne(connection.owner_id as i32), - ), - ), + .add(room_participant::Column::AnsweringConnectionId.is_null()), ) .set(room_participant::ActiveModel { answering_connection_id: ActiveValue::set(Some(connection.id as i32)), @@ -1349,6 +1341,15 @@ impl Database { .await } + pub async fn rejoin_room( + &self, + room_id: proto::RejoinRoom, + user_id: UserId, + connection_id: ConnectionId, + ) -> Result { + todo!() + } + pub async fn leave_room( &self, connection: ConnectionId, @@ -2287,7 +2288,18 @@ impl Database { let room_id = project.room_id; let project = Project { - collaborators, + collaborators: collaborators + .into_iter() + .map(|collaborator| ProjectCollaborator { + connection_id: ConnectionId { + owner_id: collaborator.connection_server_id.0 as u32, + id: collaborator.connection_id as u32, + }, + user_id: collaborator.user_id, + replica_id: collaborator.replica_id, + is_host: collaborator.is_host, + }) + .collect(), worktrees, language_servers: language_servers .into_iter() @@ -2354,8 +2366,8 @@ impl Database { pub async fn project_collaborators( &self, project_id: ProjectId, - connection: ConnectionId, - ) -> Result>> { + connection_id: ConnectionId, + ) -> Result>> { self.room_transaction(|tx| async move { let project = project::Entity::find_by_id(project_id) .one(&*tx) @@ -2364,15 +2376,23 @@ impl Database { let collaborators = project_collaborator::Entity::find() .filter(project_collaborator::Column::ProjectId.eq(project_id)) .all(&*tx) - .await?; + .await? + .into_iter() + .map(|collaborator| ProjectCollaborator { + connection_id: ConnectionId { + owner_id: collaborator.connection_server_id.0 as u32, + id: collaborator.connection_id as u32, + }, + user_id: collaborator.user_id, + replica_id: collaborator.replica_id, + is_host: collaborator.is_host, + }) + .collect::>(); - if collaborators.iter().any(|collaborator| { - let collaborator_connection = ConnectionId { - owner_id: collaborator.connection_server_id.0 as u32, - id: collaborator.connection_id as u32, - }; - collaborator_connection == connection - }) { + if collaborators + .iter() + .any(|collaborator| collaborator.connection_id == connection_id) + { Ok((project.room_id, collaborators)) } else { Err(anyhow!("no such project"))? @@ -2846,6 +2866,38 @@ id_type!(ServerId); id_type!(SignupId); id_type!(UserId); +pub struct RejoinedRoom { + pub room: proto::Room, + pub rejoined_projects: Vec, + pub reshared_projects: Vec, +} + +pub struct ResharedProject { + pub id: ProjectId, + pub old_connection_id: ConnectionId, + pub collaborators: Vec, +} + +pub struct RejoinedProject { + pub id: ProjectId, + pub old_connection_id: ConnectionId, + pub collaborators: Vec, + pub worktrees: Vec, + pub language_servers: Vec, +} + +pub struct RejoinedWorktree { + pub id: u64, + pub abs_path: String, + pub root_name: String, + pub visible: bool, + pub updated_entries: Vec, + pub removed_entries: Vec, + pub diagnostic_summaries: Vec, + pub scan_id: u64, + pub is_complete: bool, +} + pub struct LeftRoom { pub room: proto::Room, pub left_projects: HashMap, @@ -2859,11 +2911,28 @@ pub struct RefreshedRoom { } pub struct Project { - pub collaborators: Vec, + pub collaborators: Vec, pub worktrees: BTreeMap, pub language_servers: Vec, } +pub struct ProjectCollaborator { + pub connection_id: ConnectionId, + pub user_id: UserId, + pub replica_id: ReplicaId, + pub is_host: bool, +} + +impl ProjectCollaborator { + pub fn to_proto(&self) -> proto::Collaborator { + proto::Collaborator { + peer_id: Some(self.connection_id.into()), + replica_id: self.replica_id.0 as u32, + user_id: self.user_id.to_proto(), + } + } +} + pub struct LeftProject { pub id: ProjectId, pub host_user_id: UserId, diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 03e6eb50e2..beeb666da6 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -184,6 +184,7 @@ impl Server { .add_request_handler(ping) .add_request_handler(create_room) .add_request_handler(join_room) + .add_request_handler(rejoin_room) .add_message_handler(leave_room) .add_request_handler(call) .add_request_handler(cancel_call) @@ -941,6 +942,148 @@ async fn join_room( Ok(()) } +async fn rejoin_room( + request: proto::RejoinRoom, + response: Response, + session: Session, +) -> Result<()> { + let mut rejoined_room = session + .db() + .await + .rejoin_room(request, session.user_id, session.connection_id) + .await?; + + response.send(proto::RejoinRoomResponse { + room: Some(rejoined_room.room.clone()), + reshared_projects: rejoined_room + .reshared_projects + .iter() + .map(|project| proto::ResharedProject { + id: project.id.to_proto(), + collaborators: project + .collaborators + .iter() + .map(|collaborator| collaborator.to_proto()) + .collect(), + }) + .collect(), + rejoined_projects: rejoined_room + .rejoined_projects + .iter() + .map(|rejoined_project| proto::RejoinedProject { + id: rejoined_project.id.to_proto(), + worktrees: rejoined_project + .worktrees + .iter() + .map(|worktree| proto::WorktreeMetadata { + id: worktree.id, + root_name: worktree.root_name.clone(), + visible: worktree.visible, + abs_path: worktree.abs_path.clone(), + }) + .collect(), + collaborators: rejoined_project + .collaborators + .iter() + .map(|collaborator| collaborator.to_proto()) + .collect(), + language_servers: rejoined_project.language_servers.clone(), + }) + .collect(), + })?; + room_updated(&rejoined_room.room, &session.peer); + + // Notify other participants about this peer's reconnection to projects. + for project in &rejoined_room.reshared_projects { + for collaborator in &project.collaborators { + if collaborator.connection_id != session.connection_id { + session + .peer + .send( + collaborator.connection_id, + proto::UpdateProjectCollaborator { + project_id: project.id.to_proto(), + old_peer_id: Some(project.old_connection_id.into()), + new_peer_id: Some(session.connection_id.into()), + }, + ) + .trace_err(); + } + } + } + for project in &rejoined_room.rejoined_projects { + for collaborator in &project.collaborators { + if collaborator.connection_id != session.connection_id { + session + .peer + .send( + collaborator.connection_id, + proto::UpdateProjectCollaborator { + project_id: project.id.to_proto(), + old_peer_id: Some(project.old_connection_id.into()), + new_peer_id: Some(session.connection_id.into()), + }, + ) + .trace_err(); + } + } + } + + for project in &mut rejoined_room.rejoined_projects { + for worktree in mem::take(&mut project.worktrees) { + #[cfg(any(test, feature = "test-support"))] + const MAX_CHUNK_SIZE: usize = 2; + #[cfg(not(any(test, feature = "test-support")))] + const MAX_CHUNK_SIZE: usize = 256; + + // Stream this worktree's entries. + let message = proto::UpdateWorktree { + project_id: project.id.to_proto(), + worktree_id: worktree.id, + abs_path: worktree.abs_path.clone(), + root_name: worktree.root_name, + updated_entries: worktree.updated_entries, + removed_entries: worktree.removed_entries, + scan_id: worktree.scan_id, + is_last_update: worktree.is_complete, + }; + for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) { + session.peer.send(session.connection_id, update.clone())?; + } + + // Stream this worktree's diagnostics. + for summary in worktree.diagnostic_summaries { + session.peer.send( + session.connection_id, + proto::UpdateDiagnosticSummary { + project_id: project.id.to_proto(), + worktree_id: worktree.id, + summary: Some(summary), + }, + )?; + } + } + + for language_server in &project.language_servers { + session.peer.send( + session.connection_id, + proto::UpdateLanguageServer { + project_id: project.id.to_proto(), + language_server_id: language_server.id, + variant: Some( + proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated( + proto::LspDiskBasedDiagnosticsUpdated {}, + ), + ), + }, + )?; + } + } + + update_user_contacts(session.user_id, &session).await?; + Ok(()) +} + async fn leave_room(_message: proto::LeaveRoom, session: Session) -> Result<()> { leave_room_for_session(&session).await } @@ -1160,18 +1303,8 @@ async fn join_project( let collaborators = project .collaborators .iter() - .map(|collaborator| { - let peer_id = proto::PeerId { - owner_id: collaborator.connection_server_id.0 as u32, - id: collaborator.connection_id as u32, - }; - proto::Collaborator { - peer_id: Some(peer_id), - replica_id: collaborator.replica_id.0 as u32, - user_id: collaborator.user_id.to_proto(), - } - }) - .filter(|collaborator| collaborator.peer_id != Some(session.connection_id.into())) + .filter(|collaborator| collaborator.connection_id != session.connection_id) + .map(|collaborator| collaborator.to_proto()) .collect::>(); let worktrees = project .worktrees @@ -1413,14 +1546,11 @@ where .await .project_collaborators(project_id, session.connection_id) .await?; - let host = collaborators + collaborators .iter() .find(|collaborator| collaborator.is_host) - .ok_or_else(|| anyhow!("host not found"))?; - ConnectionId { - owner_id: host.connection_server_id.0 as u32, - id: host.connection_id as u32, - } + .ok_or_else(|| anyhow!("host not found"))? + .connection_id }; let payload = session @@ -1444,14 +1574,11 @@ async fn save_buffer( .await .project_collaborators(project_id, session.connection_id) .await?; - let host = collaborators + collaborators .iter() .find(|collaborator| collaborator.is_host) - .ok_or_else(|| anyhow!("host not found"))?; - ConnectionId { - owner_id: host.connection_server_id.0 as u32, - id: host.connection_id as u32, - } + .ok_or_else(|| anyhow!("host not found"))? + .connection_id }; let response_payload = session .peer @@ -1463,17 +1590,10 @@ async fn save_buffer( .await .project_collaborators(project_id, session.connection_id) .await?; - collaborators.retain(|collaborator| { - let collaborator_connection = ConnectionId { - owner_id: collaborator.connection_server_id.0 as u32, - id: collaborator.connection_id as u32, - }; - collaborator_connection != session.connection_id - }); - let project_connection_ids = collaborators.iter().map(|collaborator| ConnectionId { - owner_id: collaborator.connection_server_id.0 as u32, - id: collaborator.connection_id as u32, - }); + collaborators.retain(|collaborator| collaborator.connection_id != session.connection_id); + let project_connection_ids = collaborators + .iter() + .map(|collaborator| collaborator.connection_id); broadcast(host_connection_id, project_connection_ids, |conn_id| { session .peer diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index b3322b2923..51cc8aa6cd 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -39,6 +39,7 @@ message Envelope { JoinProjectResponse join_project_response = 25; LeaveProject leave_project = 26; AddProjectCollaborator add_project_collaborator = 27; + UpdateProjectCollaborator update_project_collaborator = 110; RemoveProjectCollaborator remove_project_collaborator = 28; GetDefinition get_definition = 29; @@ -193,10 +194,9 @@ message ResharedProject { message RejoinedProject { uint64 id = 1; - uint32 replica_id = 2; - repeated WorktreeMetadata worktrees = 3; - repeated Collaborator collaborators = 4; - repeated LanguageServer language_servers = 5; + repeated WorktreeMetadata worktrees = 2; + repeated Collaborator collaborators = 3; + repeated LanguageServer language_servers = 4; } message LeaveRoom {} @@ -360,6 +360,12 @@ message AddProjectCollaborator { Collaborator collaborator = 2; } +message UpdateProjectCollaborator { + uint64 project_id = 1; + PeerId old_peer_id = 2; + PeerId new_peer_id = 3; +} + message RemoveProjectCollaborator { uint64 project_id = 1; PeerId peer_id = 2; diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index ca70b7dbd9..b2017b839a 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -219,6 +219,7 @@ messages!( (UpdateLanguageServer, Foreground), (UpdateParticipantLocation, Foreground), (UpdateProject, Foreground), + (UpdateProjectCollaborator, Foreground), (UpdateWorktree, Foreground), (UpdateDiffBase, Background), (GetPrivateUserInfo, Foreground), @@ -322,6 +323,7 @@ entity_messages!( UpdateFollowers, UpdateLanguageServer, UpdateProject, + UpdateProjectCollaborator, UpdateWorktree, UpdateDiffBase );