From 2723ec18f35aa834a860d8514b205e5a12b8c874 Mon Sep 17 00:00:00 2001 From: Conrad Irwin Date: Tue, 27 Aug 2024 20:49:37 -0600 Subject: [PATCH] TEMP --- crates/project/src/buffer_store.rs | 139 ++++++++++++++++++++++++++++- crates/project/src/project.rs | 120 +++++-------------------- 2 files changed, 158 insertions(+), 101 deletions(-) diff --git a/crates/project/src/buffer_store.rs b/crates/project/src/buffer_store.rs index ddca456400..1c14a0c670 100644 --- a/crates/project/src/buffer_store.rs +++ b/crates/project/src/buffer_store.rs @@ -4,6 +4,7 @@ use crate::{ Item, NoRepositoryError, ProjectPath, }; use anyhow::{anyhow, Context as _, Result}; +use client::Client; use collections::{hash_map, HashMap, HashSet}; use fs::Fs; use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt}; @@ -23,7 +24,7 @@ use rpc::{ use smol::channel::Receiver; use std::{io, path::Path, str::FromStr as _, sync::Arc}; use text::BufferId; -use util::{debug_panic, maybe, ResultExt as _}; +use util::{debug_panic, maybe, ResultExt as _, TryFutureExt}; use worktree::{ File, PathChange, ProjectEntryId, RemoteWorktree, UpdatedGitRepositoriesSet, Worktree, WorktreeId, @@ -45,6 +46,7 @@ pub struct BufferStore { loading_remote_buffers_by_id: HashMap>, remote_buffer_listeners: HashMap, anyhow::Error>>>>, + shared_buffers: HashMap>, } enum OpenBuffer { @@ -93,6 +95,7 @@ impl BufferStore { local_buffer_ids_by_path: Default::default(), local_buffer_ids_by_entry_id: Default::default(), loading_buffers_by_path: Default::default(), + shared_buffers: Default::default(), } } @@ -1075,6 +1078,90 @@ impl BufferStore { })? } + pub fn handle_synchronize_buffers( + &mut self, + envelope: TypedEnvelope, + cx: &mut ModelContext, + client: Arc, + ) -> Result { + let project_id = envelope.payload.project_id; + let mut response = proto::SynchronizeBuffersResponse { + buffers: Default::default(), + }; + let Some(guest_id) = envelope.original_sender_id else { + anyhow::bail!("missing original_sender_id on SynchronizeBuffers request"); + }; + + self.shared_buffers.entry(guest_id).or_default().clear(); + for buffer in envelope.payload.buffers { + let buffer_id = BufferId::new(buffer.id)?; + let remote_version = language::proto::deserialize_version(&buffer.version); + if let Some(buffer) = self.get(buffer_id) { + self.shared_buffers + .entry(guest_id) + .or_default() + .insert(buffer_id); + + let buffer = buffer.read(cx); + response.buffers.push(proto::BufferVersion { + id: buffer_id.into(), + version: language::proto::serialize_version(&buffer.version), + }); + + let operations = buffer.serialize_ops(Some(remote_version), cx); + let client = client.clone(); + if let Some(file) = buffer.file() { + client + .send(proto::UpdateBufferFile { + project_id, + buffer_id: buffer_id.into(), + file: Some(file.to_proto(cx)), + }) + .log_err(); + } + + client + .send(proto::UpdateDiffBase { + project_id, + buffer_id: buffer_id.into(), + diff_base: buffer.diff_base().map(ToString::to_string), + }) + .log_err(); + + client + .send(proto::BufferReloaded { + project_id, + buffer_id: buffer_id.into(), + version: language::proto::serialize_version(buffer.saved_version()), + mtime: buffer.saved_mtime().map(|time| time.into()), + line_ending: language::proto::serialize_line_ending(buffer.line_ending()) + as i32, + }) + .log_err(); + + cx.background_executor() + .spawn( + async move { + let operations = operations.await; + for chunk in split_operations(operations) { + client + .request(proto::UpdateBuffer { + project_id, + buffer_id: buffer_id.into(), + operations: chunk, + }) + .await?; + } + anyhow::Ok(()) + } + .log_err(), + ) + .detach(); + } + } + Ok(response) + } + pub fn handle_create_buffer_for_peer( &mut self, envelope: TypedEnvelope, @@ -1326,6 +1413,56 @@ impl BufferStore { receiver.next().await; } } + + pub fn create_shared_buffer_for_peer( + &mut self, + buffer: &Model, + peer_id: proto::PeerId, + client: &Client, + cx: &mut ModelContext, + ) -> BufferId { + let buffer_id = buffer.read(cx).remote_id(); + if !self + .shared_buffers + .entry(peer_id) + .or_default() + .insert(buffer_id) + { + return buffer_id; + } + let Some(remote_id) = self.remote_id else { + return buffer_id; + }; + + cx.spawn(|this, mut cx| async move { + BufferStore::create_buffer_for_peer( + this, + peer_id, + buffer_id, + remote_id, + client.clone().into(), + &mut cx, + ) + .await?; + anyhow::Ok(()) + }) + .detach_and_log_err(cx); + buffer_id + } + + pub fn forget_shared_buffers(&mut self) { + self.shared_buffers.clear(); + } + + pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) { + self.shared_buffers.remove(peer_id); + } + + pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) { + if let Some(buffers) = self.shared_buffers.remove(old_peer_id) { + self.shared_buffers.insert(new_peer_id, buffers); + } + } } impl OpenBuffer { diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index f676f72843..be9cc77759 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -16,7 +16,7 @@ mod project_tests; pub mod search_history; mod yarn; -use anyhow::{anyhow, bail, Context as _, Result}; +use anyhow::{anyhow, Context as _, Result}; use async_trait::async_trait; use buffer_store::{BufferStore, BufferStoreEvent}; use client::{ @@ -208,7 +208,6 @@ pub struct Project { worktree_store: Model, buffer_store: Model, _subscriptions: Vec, - shared_buffers: HashMap>, #[allow(clippy::type_complexity)] loading_worktrees: HashMap, Shared, Arc>>>>, @@ -807,7 +806,6 @@ impl Project { collaborators: Default::default(), worktree_store, buffer_store, - shared_buffers: Default::default(), loading_worktrees: Default::default(), buffer_snapshots: Default::default(), join_project_response_message_id: 0, @@ -979,7 +977,6 @@ impl Project { buffer_ordered_messages_tx: tx, buffer_store: buffer_store.clone(), worktree_store, - shared_buffers: Default::default(), loading_worktrees: Default::default(), active_entry: None, collaborators: Default::default(), @@ -1728,7 +1725,8 @@ impl Project { message: proto::ResharedProject, cx: &mut ModelContext, ) -> Result<()> { - self.shared_buffers.clear(); + self.buffer_store + .update(cx, |buffer_store, _| buffer_store.forget_shared_buffers()); self.set_collaborators_from_proto(message.collaborators, cx)?; self.metadata_changed(cx); cx.emit(Event::Reshared); @@ -1798,13 +1796,14 @@ impl Project { if let ProjectClientState::Shared { remote_id, .. } = self.client_state { self.client_state = ProjectClientState::Local; self.collaborators.clear(); - self.shared_buffers.clear(); self.client_subscriptions.clear(); self.worktree_store.update(cx, |store, cx| { store.set_shared(false, cx); }); - self.buffer_store - .update(cx, |buffer_store, cx| buffer_store.set_remote_id(None, cx)); + self.buffer_store.update(cx, |buffer_store, cx| { + buffer_store.forget_shared_buffers(); + buffer_store.set_remote_id(None, cx) + }); self.client .send(proto::UnshareProject { project_id: remote_id, @@ -8542,7 +8541,9 @@ impl Project { let collaborator = Collaborator::from_proto(collaborator)?; this.update(&mut cx, |this, cx| { - this.shared_buffers.remove(&collaborator.peer_id); + this.buffer_store.update(cx, |buffer_store, cx| { + buffer_store.forget_shared_buffers_for(&collaborator.peer_id); + }); cx.emit(Event::CollaboratorJoined(collaborator.peer_id)); this.collaborators .insert(collaborator.peer_id, collaborator); @@ -8573,16 +8574,10 @@ impl Project { let is_host = collaborator.replica_id == 0; this.collaborators.insert(new_peer_id, collaborator); - let buffers = this.shared_buffers.remove(&old_peer_id); - log::info!( - "peer {} became {}. moving buffers {:?}", - old_peer_id, - new_peer_id, - &buffers - ); - if let Some(buffers) = buffers { - this.shared_buffers.insert(new_peer_id, buffers); - } + log::info!("peer {} became {}", old_peer_id, new_peer_id,); + this.buffer_store.update(cx, |buffer_store, cx| { + buffer_store.update_peer_id(&old_peer_id, new_peer_id) + }); if is_host { this.buffer_store @@ -8617,11 +8612,11 @@ impl Project { .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))? .replica_id; this.buffer_store.update(cx, |buffer_store, cx| { + buffer_store.forget_shared_buffers_for(&peer_id); for buffer in buffer_store.buffers() { buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx)); } }); - this.shared_buffers.remove(&peer_id); cx.emit(Event::CollaboratorLeft(peer_id)); cx.notify(); @@ -8887,86 +8882,11 @@ impl Project { envelope: TypedEnvelope, mut cx: AsyncAppContext, ) -> Result { - let project_id = envelope.payload.project_id; - let mut response = proto::SynchronizeBuffersResponse { - buffers: Default::default(), - }; - - this.update(&mut cx, |this, cx| { - let Some(guest_id) = envelope.original_sender_id else { - error!("missing original_sender_id on SynchronizeBuffers request"); - bail!("missing original_sender_id on SynchronizeBuffers request"); - }; - - this.shared_buffers.entry(guest_id).or_default().clear(); - for buffer in envelope.payload.buffers { - let buffer_id = BufferId::new(buffer.id)?; - let remote_version = language::proto::deserialize_version(&buffer.version); - if let Some(buffer) = this.buffer_for_id(buffer_id, cx) { - this.shared_buffers - .entry(guest_id) - .or_default() - .insert(buffer_id); - - let buffer = buffer.read(cx); - response.buffers.push(proto::BufferVersion { - id: buffer_id.into(), - version: language::proto::serialize_version(&buffer.version), - }); - - let operations = buffer.serialize_ops(Some(remote_version), cx); - let client = this.client.clone(); - if let Some(file) = buffer.file() { - client - .send(proto::UpdateBufferFile { - project_id, - buffer_id: buffer_id.into(), - file: Some(file.to_proto(cx)), - }) - .log_err(); - } - - client - .send(proto::UpdateDiffBase { - project_id, - buffer_id: buffer_id.into(), - diff_base: buffer.diff_base().map(ToString::to_string), - }) - .log_err(); - - client - .send(proto::BufferReloaded { - project_id, - buffer_id: buffer_id.into(), - version: language::proto::serialize_version(buffer.saved_version()), - mtime: buffer.saved_mtime().map(|time| time.into()), - line_ending: language::proto::serialize_line_ending( - buffer.line_ending(), - ) as i32, - }) - .log_err(); - - cx.background_executor() - .spawn( - async move { - let operations = operations.await; - for chunk in split_operations(operations) { - client - .request(proto::UpdateBuffer { - project_id, - buffer_id: buffer_id.into(), - operations: chunk, - }) - .await?; - } - anyhow::Ok(()) - } - .log_err(), - ) - .detach(); - } - } - Ok(()) + let response = this.update(&mut cx, |this, cx| { + let client = this.client.clone(); + this.buffer_store.update(cx, |this, cx| { + this.handle_synchronize_buffers(envelope, cx, client) + }) })??; Ok(response)