Store operations for unknown buffers when there are outstanding buffer RPC requests

This commit is contained in:
Max Brunsfeld 2022-02-23 18:22:56 -08:00
parent f1921c8df5
commit a6613d5345

View file

@ -32,10 +32,7 @@ use std::{
ops::Range,
path::{Component, Path, PathBuf},
rc::Rc,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
sync::{atomic::AtomicBool, Arc},
time::Instant,
};
use util::{post_inc, ResultExt, TryFutureExt as _};
@ -57,18 +54,23 @@ pub struct Project {
collaborators: HashMap<PeerId, Collaborator>,
subscriptions: Vec<client::Subscription>,
language_servers_with_diagnostics_running: isize,
open_buffers: HashMap<u64, OpenBuffer>,
opened_buffer: broadcast::Sender<()>,
loading_buffers: HashMap<
ProjectPath,
postage::watch::Receiver<Option<Result<ModelHandle<Buffer>, Arc<anyhow::Error>>>>,
>,
buffer_request_count: Rc<AtomicUsize>,
preserved_buffers: Rc<RefCell<Vec<ModelHandle<Buffer>>>>,
buffers_state: Rc<RefCell<ProjectBuffers>>,
shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
nonce: u128,
}
#[derive(Default)]
struct ProjectBuffers {
buffer_request_count: usize,
preserved_buffers: Vec<ModelHandle<Buffer>>,
open_buffers: HashMap<u64, OpenBuffer>,
}
enum OpenBuffer {
Loaded(WeakModelHandle<Buffer>),
Loading(Vec<Operation>),
@ -149,10 +151,7 @@ pub struct Symbol {
pub signature: [u8; 32],
}
pub struct BufferRequestHandle {
buffer_request_count: Rc<AtomicUsize>,
preserved_buffers: Rc<RefCell<Vec<ModelHandle<Buffer>>>>,
}
pub struct BufferRequestHandle(Rc<RefCell<ProjectBuffers>>);
#[derive(Default)]
pub struct ProjectTransaction(pub HashMap<ModelHandle<Buffer>, language::Transaction>);
@ -282,10 +281,9 @@ impl Project {
Self {
worktrees: Default::default(),
collaborators: Default::default(),
open_buffers: Default::default(),
buffers_state: Default::default(),
loading_buffers: Default::default(),
shared_buffers: Default::default(),
preserved_buffers: Default::default(),
client_state: ProjectClientState::Local {
is_shared: false,
remote_id_tx,
@ -301,7 +299,6 @@ impl Project {
fs,
language_servers_with_diagnostics_running: 0,
language_servers: Default::default(),
buffer_request_count: Default::default(),
started_language_servers: Default::default(),
nonce: StdRng::from_entropy().gen(),
}
@ -337,7 +334,6 @@ impl Project {
let this = cx.add_model(|cx| {
let mut this = Self {
worktrees: Vec::new(),
open_buffers: Default::default(),
loading_buffers: Default::default(),
opened_buffer: broadcast::channel(1).0,
shared_buffers: Default::default(),
@ -356,8 +352,7 @@ impl Project {
language_servers_with_diagnostics_running: 0,
language_servers: Default::default(),
started_language_servers: Default::default(),
buffer_request_count: Default::default(),
preserved_buffers: Default::default(),
buffers_state: Default::default(),
nonce: StdRng::from_entropy().gen(),
};
for worktree in worktrees {
@ -406,7 +401,9 @@ impl Project {
#[cfg(any(test, feature = "test-support"))]
pub fn has_buffered_operations(&self) -> bool {
self.open_buffers
self.buffers_state
.borrow()
.open_buffers
.values()
.any(|buffer| matches!(buffer, OpenBuffer::Loading(_)))
}
@ -637,11 +634,6 @@ impl Project {
*tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
// Record the fact that the buffer is no longer loading.
this.loading_buffers.remove(&project_path);
if this.loading_buffers.is_empty() {
this.open_buffers
.retain(|_, buffer| matches!(buffer, OpenBuffer::Loaded(_)))
}
let buffer = load_result.map_err(Arc::new)?;
Ok(buffer)
}));
@ -754,18 +746,7 @@ impl Project {
}
fn start_buffer_request(&self, cx: &AppContext) -> BufferRequestHandle {
if self.buffer_request_count.fetch_add(1, Ordering::SeqCst) == 0 {
self.preserved_buffers.borrow_mut().extend(
self.open_buffers
.values()
.filter_map(|buffer| buffer.upgrade(cx)),
)
}
BufferRequestHandle {
buffer_request_count: self.buffer_request_count.clone(),
preserved_buffers: self.preserved_buffers.clone(),
}
BufferRequestHandle::new(self.buffers_state.clone(), cx)
}
pub fn save_buffer_as(
@ -796,16 +777,20 @@ impl Project {
pub fn has_open_buffer(&self, path: impl Into<ProjectPath>, cx: &AppContext) -> bool {
let path = path.into();
if let Some(worktree) = self.worktree_for_id(path.worktree_id, cx) {
self.open_buffers.iter().any(|(_, buffer)| {
if let Some(buffer) = buffer.upgrade(cx) {
if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
if file.worktree == worktree && file.path() == &path.path {
return true;
self.buffers_state
.borrow()
.open_buffers
.iter()
.any(|(_, buffer)| {
if let Some(buffer) = buffer.upgrade(cx) {
if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
if file.worktree == worktree && file.path() == &path.path {
return true;
}
}
}
}
false
})
false
})
} else {
false
}
@ -818,18 +803,21 @@ impl Project {
) -> Option<ModelHandle<Buffer>> {
let mut result = None;
let worktree = self.worktree_for_id(path.worktree_id, cx)?;
self.open_buffers.retain(|_, buffer| {
if let Some(buffer) = buffer.upgrade(cx) {
if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
if file.worktree == worktree && file.path() == &path.path {
result = Some(buffer);
self.buffers_state
.borrow_mut()
.open_buffers
.retain(|_, buffer| {
if let Some(buffer) = buffer.upgrade(cx) {
if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
if file.worktree == worktree && file.path() == &path.path {
result = Some(buffer);
}
}
true
} else {
false
}
true
} else {
false
}
});
});
result
}
@ -841,6 +829,8 @@ impl Project {
) -> Result<()> {
let remote_id = buffer.read(cx).remote_id();
match self
.buffers_state
.borrow_mut()
.open_buffers
.insert(remote_id, OpenBuffer::Loaded(buffer.downgrade()))
{
@ -1175,7 +1165,7 @@ impl Project {
path: relative_path.into(),
};
for buffer in self.open_buffers.values() {
for buffer in self.buffers_state.borrow().open_buffers.values() {
if let Some(buffer) = buffer.upgrade(cx) {
if buffer
.read(cx)
@ -2211,7 +2201,7 @@ impl Project {
) {
let snapshot = worktree_handle.read(cx).snapshot();
let mut buffers_to_delete = Vec::new();
for (buffer_id, buffer) in &self.open_buffers {
for (buffer_id, buffer) in &self.buffers_state.borrow().open_buffers {
if let Some(buffer) = buffer.upgrade(cx) {
buffer.update(cx, |buffer, cx| {
if let Some(old_file) = File::from_dyn(buffer.file()) {
@ -2268,7 +2258,10 @@ impl Project {
}
for buffer_id in buffers_to_delete {
self.open_buffers.remove(&buffer_id);
self.buffers_state
.borrow_mut()
.open_buffers
.remove(&buffer_id);
}
}
@ -2396,7 +2389,7 @@ impl Project {
.ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?
.replica_id;
this.shared_buffers.remove(&peer_id);
for (_, buffer) in &this.open_buffers {
for (_, buffer) in &this.buffers_state.borrow().open_buffers {
if let Some(buffer) = buffer.upgrade(cx) {
buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
}
@ -2523,7 +2516,9 @@ impl Project {
.map(|op| language::proto::deserialize_operation(op))
.collect::<Result<Vec<_>, _>>()?;
let is_remote = this.is_remote();
match this.open_buffers.entry(buffer_id) {
let mut buffers_state = this.buffers_state.borrow_mut();
let buffer_request_count = buffers_state.buffer_request_count;
match buffers_state.open_buffers.entry(buffer_id) {
hash_map::Entry::Occupied(mut e) => match e.get_mut() {
OpenBuffer::Loaded(buffer) => {
if let Some(buffer) = buffer.upgrade(cx) {
@ -2533,7 +2528,7 @@ impl Project {
OpenBuffer::Loading(operations) => operations.extend_from_slice(&ops),
},
hash_map::Entry::Vacant(e) => {
if is_remote && this.loading_buffers.len() > 0 {
if is_remote && buffer_request_count > 0 {
e.insert(OpenBuffer::Loading(ops));
}
}
@ -2557,6 +2552,8 @@ impl Project {
.ok_or_else(|| anyhow!("no such worktree"))?;
let file = File::from_proto(file, worktree.clone(), cx)?;
let buffer = this
.buffers_state
.borrow_mut()
.open_buffers
.get_mut(&buffer_id)
.and_then(|b| b.upgrade(cx))
@ -2937,6 +2934,7 @@ impl Project {
let transaction = language::proto::deserialize_transaction(transaction)?;
project_transaction.0.insert(buffer, transaction);
}
for (buffer, transaction) in &project_transaction.0 {
buffer
.update(&mut cx, |buffer, _| {
@ -2991,7 +2989,9 @@ impl Project {
proto::buffer::Variant::Id(id) => {
let buffer = loop {
let buffer = this.read_with(&cx, |this, cx| {
this.open_buffers
this.buffers_state
.borrow()
.open_buffers
.get(&id)
.and_then(|buffer| buffer.upgrade(cx))
});
@ -3100,6 +3100,8 @@ impl Project {
this.update(&mut cx, |this, cx| {
let buffer = this
.buffers_state
.borrow()
.open_buffers
.get(&envelope.payload.buffer_id)
.and_then(|buffer| buffer.upgrade(cx));
@ -3126,6 +3128,8 @@ impl Project {
.into();
this.update(&mut cx, |this, cx| {
let buffer = this
.buffers_state
.borrow()
.open_buffers
.get(&payload.buffer_id)
.and_then(|buffer| buffer.upgrade(cx));
@ -3177,25 +3181,43 @@ impl Project {
}
impl BufferRequestHandle {
fn new(state: Rc<RefCell<ProjectBuffers>>, cx: &AppContext) -> Self {
{
let state = &mut *state.borrow_mut();
state.buffer_request_count += 1;
if state.buffer_request_count == 1 {
state.preserved_buffers.extend(
state
.open_buffers
.values()
.filter_map(|buffer| buffer.upgrade(cx)),
)
}
}
Self(state)
}
fn preserve_buffer(&self, buffer: ModelHandle<Buffer>) {
self.preserved_buffers.borrow_mut().push(buffer);
self.0.borrow_mut().preserved_buffers.push(buffer);
}
}
impl Clone for BufferRequestHandle {
fn clone(&self) -> Self {
self.buffer_request_count.fetch_add(1, Ordering::SeqCst);
Self {
buffer_request_count: self.buffer_request_count.clone(),
preserved_buffers: self.preserved_buffers.clone(),
}
self.0.borrow_mut().buffer_request_count += 1;
Self(self.0.clone())
}
}
impl Drop for BufferRequestHandle {
fn drop(&mut self) {
if self.buffer_request_count.fetch_sub(1, Ordering::SeqCst) == 1 {
self.preserved_buffers.borrow_mut().clear();
let mut state = self.0.borrow_mut();
state.buffer_request_count -= 1;
if state.buffer_request_count == 0 {
state.preserved_buffers.clear();
state
.open_buffers
.retain(|_, buffer| matches!(buffer, OpenBuffer::Loaded(_)))
}
}
}