This commit is contained in:
Conrad Irwin 2024-08-27 20:49:37 -06:00
parent bef575e30a
commit 2723ec18f3
2 changed files with 158 additions and 101 deletions

View file

@ -4,6 +4,7 @@ use crate::{
Item, NoRepositoryError, ProjectPath,
};
use anyhow::{anyhow, Context as _, Result};
use client::Client;
use collections::{hash_map, HashMap, HashSet};
use fs::Fs;
use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
@ -23,7 +24,7 @@ use rpc::{
use smol::channel::Receiver;
use std::{io, path::Path, str::FromStr as _, sync::Arc};
use text::BufferId;
use util::{debug_panic, maybe, ResultExt as _};
use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
use worktree::{
File, PathChange, ProjectEntryId, RemoteWorktree, UpdatedGitRepositoriesSet, Worktree,
WorktreeId,
@ -45,6 +46,7 @@ pub struct BufferStore {
loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
remote_buffer_listeners:
HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
shared_buffers: HashMap<proto::PeerId, HashSet<BufferId>>,
}
enum OpenBuffer {
@ -93,6 +95,7 @@ impl BufferStore {
local_buffer_ids_by_path: Default::default(),
local_buffer_ids_by_entry_id: Default::default(),
loading_buffers_by_path: Default::default(),
shared_buffers: Default::default(),
}
}
@ -1075,6 +1078,90 @@ impl BufferStore {
})?
}
pub fn handle_synchronize_buffers(
&mut self,
envelope: TypedEnvelope<proto::SynchronizeBuffers>,
cx: &mut ModelContext<Self>,
client: Arc<Client>,
) -> Result<proto::SynchronizeBuffersResponse> {
let project_id = envelope.payload.project_id;
let mut response = proto::SynchronizeBuffersResponse {
buffers: Default::default(),
};
let Some(guest_id) = envelope.original_sender_id else {
anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
};
self.shared_buffers.entry(guest_id).or_default().clear();
for buffer in envelope.payload.buffers {
let buffer_id = BufferId::new(buffer.id)?;
let remote_version = language::proto::deserialize_version(&buffer.version);
if let Some(buffer) = self.get(buffer_id) {
self.shared_buffers
.entry(guest_id)
.or_default()
.insert(buffer_id);
let buffer = buffer.read(cx);
response.buffers.push(proto::BufferVersion {
id: buffer_id.into(),
version: language::proto::serialize_version(&buffer.version),
});
let operations = buffer.serialize_ops(Some(remote_version), cx);
let client = client.clone();
if let Some(file) = buffer.file() {
client
.send(proto::UpdateBufferFile {
project_id,
buffer_id: buffer_id.into(),
file: Some(file.to_proto(cx)),
})
.log_err();
}
client
.send(proto::UpdateDiffBase {
project_id,
buffer_id: buffer_id.into(),
diff_base: buffer.diff_base().map(ToString::to_string),
})
.log_err();
client
.send(proto::BufferReloaded {
project_id,
buffer_id: buffer_id.into(),
version: language::proto::serialize_version(buffer.saved_version()),
mtime: buffer.saved_mtime().map(|time| time.into()),
line_ending: language::proto::serialize_line_ending(buffer.line_ending())
as i32,
})
.log_err();
cx.background_executor()
.spawn(
async move {
let operations = operations.await;
for chunk in split_operations(operations) {
client
.request(proto::UpdateBuffer {
project_id,
buffer_id: buffer_id.into(),
operations: chunk,
})
.await?;
}
anyhow::Ok(())
}
.log_err(),
)
.detach();
}
}
Ok(response)
}
pub fn handle_create_buffer_for_peer(
&mut self,
envelope: TypedEnvelope<proto::CreateBufferForPeer>,
@ -1326,6 +1413,56 @@ impl BufferStore {
receiver.next().await;
}
}
pub fn create_shared_buffer_for_peer(
&mut self,
buffer: &Model<Buffer>,
peer_id: proto::PeerId,
client: &Client,
cx: &mut ModelContext<Self>,
) -> BufferId {
let buffer_id = buffer.read(cx).remote_id();
if !self
.shared_buffers
.entry(peer_id)
.or_default()
.insert(buffer_id)
{
return buffer_id;
}
let Some(remote_id) = self.remote_id else {
return buffer_id;
};
cx.spawn(|this, mut cx| async move {
BufferStore::create_buffer_for_peer(
this,
peer_id,
buffer_id,
remote_id,
client.clone().into(),
&mut cx,
)
.await?;
anyhow::Ok(())
})
.detach_and_log_err(cx);
buffer_id
}
pub fn forget_shared_buffers(&mut self) {
self.shared_buffers.clear();
}
pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
self.shared_buffers.remove(peer_id);
}
pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
self.shared_buffers.insert(new_peer_id, buffers);
}
}
}
impl OpenBuffer {

View file

@ -16,7 +16,7 @@ mod project_tests;
pub mod search_history;
mod yarn;
use anyhow::{anyhow, bail, Context as _, Result};
use anyhow::{anyhow, Context as _, Result};
use async_trait::async_trait;
use buffer_store::{BufferStore, BufferStoreEvent};
use client::{
@ -208,7 +208,6 @@ pub struct Project {
worktree_store: Model<WorktreeStore>,
buffer_store: Model<BufferStore>,
_subscriptions: Vec<gpui::Subscription>,
shared_buffers: HashMap<proto::PeerId, HashSet<BufferId>>,
#[allow(clippy::type_complexity)]
loading_worktrees:
HashMap<Arc<Path>, Shared<Task<Result<Model<Worktree>, Arc<anyhow::Error>>>>>,
@ -807,7 +806,6 @@ impl Project {
collaborators: Default::default(),
worktree_store,
buffer_store,
shared_buffers: Default::default(),
loading_worktrees: Default::default(),
buffer_snapshots: Default::default(),
join_project_response_message_id: 0,
@ -979,7 +977,6 @@ impl Project {
buffer_ordered_messages_tx: tx,
buffer_store: buffer_store.clone(),
worktree_store,
shared_buffers: Default::default(),
loading_worktrees: Default::default(),
active_entry: None,
collaborators: Default::default(),
@ -1728,7 +1725,8 @@ impl Project {
message: proto::ResharedProject,
cx: &mut ModelContext<Self>,
) -> Result<()> {
self.shared_buffers.clear();
self.buffer_store
.update(cx, |buffer_store, _| buffer_store.forget_shared_buffers());
self.set_collaborators_from_proto(message.collaborators, cx)?;
self.metadata_changed(cx);
cx.emit(Event::Reshared);
@ -1798,13 +1796,14 @@ impl Project {
if let ProjectClientState::Shared { remote_id, .. } = self.client_state {
self.client_state = ProjectClientState::Local;
self.collaborators.clear();
self.shared_buffers.clear();
self.client_subscriptions.clear();
self.worktree_store.update(cx, |store, cx| {
store.set_shared(false, cx);
});
self.buffer_store
.update(cx, |buffer_store, cx| buffer_store.set_remote_id(None, cx));
self.buffer_store.update(cx, |buffer_store, cx| {
buffer_store.forget_shared_buffers();
buffer_store.set_remote_id(None, cx)
});
self.client
.send(proto::UnshareProject {
project_id: remote_id,
@ -8542,7 +8541,9 @@ impl Project {
let collaborator = Collaborator::from_proto(collaborator)?;
this.update(&mut cx, |this, cx| {
this.shared_buffers.remove(&collaborator.peer_id);
this.buffer_store.update(cx, |buffer_store, cx| {
buffer_store.forget_shared_buffers_for(&collaborator.peer_id);
});
cx.emit(Event::CollaboratorJoined(collaborator.peer_id));
this.collaborators
.insert(collaborator.peer_id, collaborator);
@ -8573,16 +8574,10 @@ impl Project {
let is_host = collaborator.replica_id == 0;
this.collaborators.insert(new_peer_id, collaborator);
let buffers = this.shared_buffers.remove(&old_peer_id);
log::info!(
"peer {} became {}. moving buffers {:?}",
old_peer_id,
new_peer_id,
&buffers
);
if let Some(buffers) = buffers {
this.shared_buffers.insert(new_peer_id, buffers);
}
log::info!("peer {} became {}", old_peer_id, new_peer_id,);
this.buffer_store.update(cx, |buffer_store, cx| {
buffer_store.update_peer_id(&old_peer_id, new_peer_id)
});
if is_host {
this.buffer_store
@ -8617,11 +8612,11 @@ impl Project {
.ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?
.replica_id;
this.buffer_store.update(cx, |buffer_store, cx| {
buffer_store.forget_shared_buffers_for(&peer_id);
for buffer in buffer_store.buffers() {
buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
}
});
this.shared_buffers.remove(&peer_id);
cx.emit(Event::CollaboratorLeft(peer_id));
cx.notify();
@ -8887,86 +8882,11 @@ impl Project {
envelope: TypedEnvelope<proto::SynchronizeBuffers>,
mut cx: AsyncAppContext,
) -> Result<proto::SynchronizeBuffersResponse> {
let project_id = envelope.payload.project_id;
let mut response = proto::SynchronizeBuffersResponse {
buffers: Default::default(),
};
this.update(&mut cx, |this, cx| {
let Some(guest_id) = envelope.original_sender_id else {
error!("missing original_sender_id on SynchronizeBuffers request");
bail!("missing original_sender_id on SynchronizeBuffers request");
};
this.shared_buffers.entry(guest_id).or_default().clear();
for buffer in envelope.payload.buffers {
let buffer_id = BufferId::new(buffer.id)?;
let remote_version = language::proto::deserialize_version(&buffer.version);
if let Some(buffer) = this.buffer_for_id(buffer_id, cx) {
this.shared_buffers
.entry(guest_id)
.or_default()
.insert(buffer_id);
let buffer = buffer.read(cx);
response.buffers.push(proto::BufferVersion {
id: buffer_id.into(),
version: language::proto::serialize_version(&buffer.version),
});
let operations = buffer.serialize_ops(Some(remote_version), cx);
let response = this.update(&mut cx, |this, cx| {
let client = this.client.clone();
if let Some(file) = buffer.file() {
client
.send(proto::UpdateBufferFile {
project_id,
buffer_id: buffer_id.into(),
file: Some(file.to_proto(cx)),
this.buffer_store.update(cx, |this, cx| {
this.handle_synchronize_buffers(envelope, cx, client)
})
.log_err();
}
client
.send(proto::UpdateDiffBase {
project_id,
buffer_id: buffer_id.into(),
diff_base: buffer.diff_base().map(ToString::to_string),
})
.log_err();
client
.send(proto::BufferReloaded {
project_id,
buffer_id: buffer_id.into(),
version: language::proto::serialize_version(buffer.saved_version()),
mtime: buffer.saved_mtime().map(|time| time.into()),
line_ending: language::proto::serialize_line_ending(
buffer.line_ending(),
) as i32,
})
.log_err();
cx.background_executor()
.spawn(
async move {
let operations = operations.await;
for chunk in split_operations(operations) {
client
.request(proto::UpdateBuffer {
project_id,
buffer_id: buffer_id.into(),
operations: chunk,
})
.await?;
}
anyhow::Ok(())
}
.log_err(),
)
.detach();
}
}
Ok(())
})??;
Ok(response)