Issue only one OpenFile and CloseFile request per handle

This commit is contained in:
Antonio Scandurra 2021-06-21 11:49:50 +02:00
parent 96c118b4df
commit f4d6fbe14f
4 changed files with 98 additions and 34 deletions

View file

@ -32,7 +32,7 @@ use std::{
ops::{Deref, DerefMut, Range},
str,
sync::Arc,
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
time::{Duration, Instant},
};
const UNDO_GROUP_INTERVAL: Duration = Duration::from_millis(300);
@ -110,7 +110,7 @@ pub struct Buffer {
deleted_text: Rope,
pub version: time::Global,
saved_version: time::Global,
saved_mtime: SystemTime,
saved_mtime: Duration,
last_edit: time::Local,
undo_map: UndoMap,
history: History,
@ -467,7 +467,7 @@ impl Buffer {
cx.emit(Event::FileHandleChanged);
});
} else {
saved_mtime = UNIX_EPOCH;
saved_mtime = Duration::ZERO;
}
let mut fragments = SumTree::new();

View file

@ -11,6 +11,7 @@ use std::collections::{HashMap, HashSet};
use std::time::Duration;
use std::{convert::TryFrom, future::Future, sync::Arc};
use surf::Url;
use zed_rpc::proto::EnvelopedMessage;
use zed_rpc::{proto::RequestMessage, rest, Peer, TypedEnvelope};
use zed_rpc::{PeerId, Receipt};
@ -191,6 +192,14 @@ impl Client {
})
}
pub fn send<T: EnvelopedMessage>(
&self,
connection_id: ConnectionId,
message: T,
) -> impl Future<Output = Result<()>> {
self.peer.send(connection_id, message)
}
pub fn request<T: RequestMessage>(
&self,
connection_id: ConnectionId,

View file

@ -27,7 +27,6 @@ use std::{
future::Future,
path::{Path, PathBuf},
sync::Arc,
time::UNIX_EPOCH,
};
use zed_rpc::{proto, TypedEnvelope};
@ -131,7 +130,7 @@ mod remote {
let file = cx.update(|cx| worktree.file(&message.path, cx)).await?;
let id = file.id() as u64;
let mtime = file.mtime().duration_since(UNIX_EPOCH).unwrap().as_secs();
let mtime = file.mtime().as_secs();
*state
.shared_files

View file

@ -19,7 +19,7 @@ use postage::{
prelude::{Sink, Stream},
watch,
};
use smol::channel::Sender;
use smol::{channel::Sender, lock::Mutex as AsyncMutex};
use std::{
cmp,
collections::{HashMap, HashSet},
@ -32,10 +32,10 @@ use std::{
os::unix::{ffi::OsStrExt, fs::MetadataExt},
path::{Path, PathBuf},
sync::{
atomic::{AtomicUsize, Ordering::SeqCst},
atomic::{AtomicU64, Ordering::SeqCst},
Arc, Weak,
},
time::{Duration, SystemTime},
time::{Duration, UNIX_EPOCH},
};
lazy_static! {
@ -135,25 +135,40 @@ pub struct LocalWorktree {
snapshot: Snapshot,
background_snapshot: Arc<Mutex<Snapshot>>,
handles: Arc<Mutex<HashMap<Arc<Path>, Weak<Mutex<FileHandleState>>>>>,
next_handle_id: AtomicUsize,
next_handle_id: AtomicU64,
scan_state: (watch::Sender<ScanState>, watch::Receiver<ScanState>),
_event_stream_handle: fsevent::Handle,
poll_scheduled: bool,
rpc: Option<rpc::Client>,
}
#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct FileHandle {
worktree: ModelHandle<Worktree>,
state: Arc<Mutex<FileHandleState>>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone)]
struct FileHandleState {
path: Arc<Path>,
is_deleted: bool,
mtime: SystemTime,
id: usize,
mtime: Duration,
id: u64,
rpc: Option<(ConnectionId, rpc::Client)>,
}
impl Drop for FileHandleState {
fn drop(&mut self) {
if let Some((connection_id, rpc)) = self.rpc.take() {
let id = self.id;
smol::spawn(async move {
if let Err(error) = rpc.send(connection_id, proto::CloseFile { id }).await {
log::warn!("error closing file {}: {}", id, error);
}
})
.detach();
}
}
}
impl LocalWorktree {
@ -315,7 +330,7 @@ impl LocalWorktree {
if let Some(handle) = handles.lock().get(&*path).and_then(Weak::upgrade) {
let mut handle = handle.lock();
handle.mtime = file.metadata()?.modified()?;
handle.mtime = file.metadata()?.modified()?.duration_since(UNIX_EPOCH)?;
handle.is_deleted = false;
}
@ -385,7 +400,7 @@ impl fmt::Debug for LocalWorktree {
pub struct RemoteWorktree {
id: usize,
snapshot: Snapshot,
handles: Arc<Mutex<HashMap<Arc<Path>, Weak<Mutex<FileHandleState>>>>>,
handles: Arc<Mutex<HashMap<Arc<Path>, Arc<AsyncMutex<Weak<Mutex<FileHandleState>>>>>>>,
rpc: rpc::Client,
connection_id: ConnectionId,
}
@ -622,7 +637,7 @@ impl fmt::Debug for Snapshot {
}
impl FileHandle {
pub fn id(&self) -> usize {
pub fn id(&self) -> u64 {
self.state.lock().id
}
@ -646,7 +661,7 @@ impl FileHandle {
self.state.lock().is_deleted
}
pub fn mtime(&self) -> SystemTime {
pub fn mtime(&self) -> Duration {
self.state.lock().mtime
}
@ -681,7 +696,9 @@ impl FileHandle {
cx.observe(&self.worktree, move |observer, worktree, cx| {
if let Some(cur_state) = cur_state.upgrade() {
let cur_state_unlocked = cur_state.lock();
if *cur_state_unlocked != prev_state {
if cur_state_unlocked.mtime != prev_state.mtime
|| cur_state_unlocked.path != prev_state.path
{
prev_state = cur_state_unlocked.clone();
drop(cur_state_unlocked);
callback(
@ -1181,11 +1198,19 @@ impl BackgroundScanner {
let mut state = state.lock();
if state.path.as_ref() == path {
if let Ok(metadata) = &metadata {
state.mtime = metadata.modified().unwrap();
state.mtime = metadata
.modified()
.unwrap()
.duration_since(UNIX_EPOCH)
.unwrap();
}
} else if state.path.starts_with(path) {
if let Ok(metadata) = fs::metadata(state.path.as_ref()) {
state.mtime = metadata.modified().unwrap();
state.mtime = metadata
.modified()
.unwrap()
.duration_since(UNIX_EPOCH)
.unwrap();
}
}
}
@ -1441,7 +1466,8 @@ impl WorktreeHandle for ModelHandle<Worktree> {
.background_executor()
.spawn(async move { fs::metadata(&abs_path) })
.await?
.modified()?;
.modified()?
.duration_since(UNIX_EPOCH)?;
let state = handle.read_with(&cx, |tree, _| {
let mut handles = tree.as_local().unwrap().handles.lock();
handles
@ -1456,18 +1482,20 @@ impl WorktreeHandle for ModelHandle<Worktree> {
is_deleted: false,
mtime,
id,
rpc: None,
}
} else {
FileHandleState {
path: path.clone(),
is_deleted: !tree.path_is_pending(path),
is_deleted: !tree.path_is_pending(&path),
mtime,
id,
rpc: None,
}
};
let state = Arc::new(Mutex::new(handle_state.clone()));
handles.insert(handle_state.path, Arc::downgrade(&state));
handles.insert(path, Arc::downgrade(&state));
state
})
});
@ -1481,18 +1509,46 @@ impl WorktreeHandle for ModelHandle<Worktree> {
let worktree_id = tree.id;
let connection_id = tree.connection_id;
let rpc = tree.rpc.clone();
let handles = tree.handles.clone();
cx.spawn(|cx| async move {
let response = rpc
.request(
connection_id,
proto::OpenFile {
worktree_id: worktree_id as u64,
path: path.to_string_lossy().to_string(),
},
)
.await?;
let state = handles
.lock()
.entry(path.clone())
.or_insert_with(|| Arc::new(AsyncMutex::new(Weak::new())))
.clone();
todo!()
let mut state = state.lock().await;
if let Some(state) = Weak::upgrade(&state) {
Ok(FileHandle {
worktree: handle,
state,
})
} else {
let response = rpc
.request(
connection_id,
proto::OpenFile {
worktree_id: worktree_id as u64,
path: path.to_string_lossy().to_string(),
},
)
.await?;
let is_deleted = handle.read_with(&cx, |tree, _| {
tree.entry_for_path(&path).is_some() || !tree.path_is_pending(&path)
});
let new_state = Arc::new(Mutex::new(FileHandleState {
path,
is_deleted,
mtime: Duration::from_secs(response.mtime),
id: response.id,
rpc: Some((connection_id, rpc)),
}));
*state = Arc::downgrade(&new_state);
Ok(FileHandle {
worktree: handle,
state: new_state,
})
}
})
}
}
@ -1646,7 +1702,7 @@ mod tests {
use std::env;
use std::fmt::Write;
use std::os::unix;
use std::time::{SystemTime, UNIX_EPOCH};
use std::time::SystemTime;
#[gpui::test]
async fn test_populate_and_search(mut cx: gpui::TestAppContext) {