diff --git a/crates/collab/src/db/queries/buffers.rs b/crates/collab/src/db/queries/buffers.rs index e814ea42a4..e141ea8865 100644 --- a/crates/collab/src/db/queries/buffers.rs +++ b/crates/collab/src/db/queries/buffers.rs @@ -161,11 +161,9 @@ impl Database { // Find the collaborator record for this user's previous lost // connection. Update it with the new connection id. - let server_id = ServerId(connection_id.owner_id as i32); - let Some(self_collaborator) = collaborators.iter_mut().find(|c| { - c.user_id == user_id - && (c.connection_lost || c.connection_server_id != server_id) - }) else { + let Some(self_collaborator) = + collaborators.iter_mut().find(|c| c.user_id == user_id) + else { log::info!("can't rejoin buffer, no previous collaborator found"); continue; }; diff --git a/crates/collab/src/db/queries/rooms.rs b/crates/collab/src/db/queries/rooms.rs index 7bd2d87c80..b5073ae7ab 100644 --- a/crates/collab/src/db/queries/rooms.rs +++ b/crates/collab/src/db/queries/rooms.rs @@ -468,15 +468,7 @@ impl Database { Condition::all() .add(room_participant::Column::RoomId.eq(room_id)) .add(room_participant::Column::UserId.eq(user_id)) - .add(room_participant::Column::AnsweringConnectionId.is_not_null()) - .add( - Condition::any() - .add(room_participant::Column::AnsweringConnectionLost.eq(true)) - .add( - room_participant::Column::AnsweringConnectionServerId - .ne(connection.owner_id as i32), - ), - ), + .add(room_participant::Column::AnsweringConnectionId.is_not_null()), ) .set(room_participant::ActiveModel { answering_connection_id: ActiveValue::set(Some(connection.id as i32)), diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index e3a4f3f17b..550f222840 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -147,7 +147,7 @@ pub struct Server { app_state: Arc, executor: Executor, handlers: HashMap, - teardown: watch::Sender<()>, + teardown: watch::Sender, } pub(crate) struct ConnectionPoolGuard<'a> { @@ -180,7 +180,7 @@ impl Server { executor, connection_pool: Default::default(), handlers: Default::default(), - teardown: watch::channel(()).0, + teardown: watch::channel(false).0, }; server @@ -436,7 +436,7 @@ impl Server { pub fn teardown(&self) { self.peer.teardown(); self.connection_pool.lock().reset(); - let _ = self.teardown.send(()); + let _ = self.teardown.send(true); } #[cfg(test)] @@ -444,6 +444,7 @@ impl Server { self.teardown(); *self.id.lock() = id; self.peer.reset(id.0 as u32); + let _ = self.teardown.send(false); } #[cfg(test)] @@ -561,6 +562,9 @@ impl Server { } let mut teardown = self.teardown.subscribe(); async move { + if *teardown.borrow() { + return Err(anyhow!("server is tearing down"))?; + } let (connection_id, handle_io, mut incoming_rx) = this .peer .add_connection(connection, { @@ -943,7 +947,7 @@ pub async fn handle_metrics(Extension(server): Extension>) -> Result #[instrument(err, skip(executor))] async fn connection_lost( session: Session, - mut teardown: watch::Receiver<()>, + mut teardown: watch::Receiver, executor: Executor, ) -> Result<()> { session.peer.disconnect(session.connection_id); diff --git a/crates/collab/src/tests/following_tests.rs b/crates/collab/src/tests/following_tests.rs index f68040d3e5..8561346e97 100644 --- a/crates/collab/src/tests/following_tests.rs +++ b/crates/collab/src/tests/following_tests.rs @@ -1578,7 +1578,7 @@ async fn test_following_across_workspaces(cx_a: &mut TestAppContext, cx_b: &mut #[gpui::test] async fn test_following_stops_on_unshare(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - let (_, client_a, client_b, channel_id) = TestServer::start2(cx_a, cx_b).await; + let (_server, client_a, client_b, channel_id) = TestServer::start2(cx_a, cx_b).await; let (workspace_a, cx_a) = client_a.build_test_workspace(cx_a).await; client_a @@ -2024,7 +2024,7 @@ async fn test_following_to_channel_notes_other_workspace( cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, ) { - let (_, client_a, client_b, channel) = TestServer::start2(cx_a, cx_b).await; + let (_server, client_a, client_b, channel) = TestServer::start2(cx_a, cx_b).await; let mut cx_a2 = cx_a.clone(); let (workspace_a, cx_a) = client_a.build_test_workspace(cx_a).await; @@ -2081,7 +2081,7 @@ async fn test_following_to_channel_notes_other_workspace( #[gpui::test] async fn test_following_while_deactivated(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - let (_, client_a, client_b, channel) = TestServer::start2(cx_a, cx_b).await; + let (_server, client_a, client_b, channel) = TestServer::start2(cx_a, cx_b).await; let mut cx_a2 = cx_a.clone(); let (workspace_a, cx_a) = client_a.build_test_workspace(cx_a).await; diff --git a/crates/collab/src/tests/test_server.rs b/crates/collab/src/tests/test_server.rs index 515fef74c3..08e6bf9a68 100644 --- a/crates/collab/src/tests/test_server.rs +++ b/crates/collab/src/tests/test_server.rs @@ -240,7 +240,12 @@ impl TestServer { Executor::Deterministic(cx.background_executor().clone()), )) .detach(); - let connection_id = connection_id_rx.await.unwrap(); + let connection_id = connection_id_rx.await.map_err(|e| { + EstablishConnectionError::Other(anyhow!( + "{} (is server shutting down?)", + e + )) + })?; connection_killers .lock() .insert(connection_id.into(), killed); diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index 4222404a47..c73f52a99e 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -361,8 +361,8 @@ impl Peer { self.connections.write().remove(&connection_id); } + #[cfg(any(test, feature = "test-support"))] pub fn reset(&self, epoch: u32) { - self.teardown(); self.next_connection_id.store(0, SeqCst); self.epoch.store(epoch, SeqCst); }