From 09fd99b1e3380f239c4971ea625a526e87b1d338 Mon Sep 17 00:00:00 2001 From: KCaverly Date: Wed, 23 Aug 2023 15:09:15 +0200 Subject: [PATCH] moved semantic_index project intialization to queue and channel method --- crates/semantic_index/src/semantic_index.rs | 267 ++++++++------------ 1 file changed, 108 insertions(+), 159 deletions(-) diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index 79e649838a..ffe6e74a6d 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -15,8 +15,9 @@ use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, use language::{Anchor, Buffer, Language, LanguageRegistry}; use parking_lot::Mutex; use parsing::{CodeContextRetriever, Document, PARSEABLE_ENTIRE_FILE_TYPES}; +use postage::stream::Stream; use postage::watch; -use project::{search::PathMatcher, Fs, Project, WorktreeId}; +use project::{search::PathMatcher, Fs, PathChange, Project, ProjectEntryId, WorktreeId}; use smol::channel; use std::{ cmp::Ordering, @@ -96,7 +97,8 @@ struct ProjectState { subscription: gpui::Subscription, outstanding_job_count_rx: watch::Receiver, _outstanding_job_count_tx: Arc>>, - queue: HashMap>, + job_queue_tx: channel::Sender, + _queue_update_task: Task<()>, } #[derive(Clone)] @@ -116,6 +118,7 @@ impl JobHandle { } impl ProjectState { fn new( + cx: &mut AppContext, subscription: gpui::Subscription, worktree_db_ids: Vec<(WorktreeId, i64)>, worktree_file_mtimes: HashMap>, @@ -125,29 +128,51 @@ impl ProjectState { let (job_count_tx, job_count_rx) = watch::channel_with(0); let job_count_tx = Arc::new(Mutex::new(job_count_tx)); + let (job_queue_tx, job_queue_rx) = channel::unbounded(); + let _queue_update_task = cx.background().spawn({ + let mut worktree_queue = Vec::new(); + async move { + while let Ok(operation) = job_queue_rx.recv().await { + Self::update_queue(&mut worktree_queue, operation); + } + } + }); + Self { worktree_db_ids, worktree_file_mtimes, outstanding_job_count_rx, _outstanding_job_count_tx, subscription, - queue: HashMap::new(), + _queue_update_task, + job_queue_tx, } } - fn add_to_queue(&mut self, worktree_id: WorktreeId, operation: IndexOperation) { - if let Some(worktree_queue) = self.queue.get_mut(&worktree_id) { - worktree_queue.push(operation); - } else { - self.queue.insert(worktree_id, vec![operation]); - } + pub fn get_outstanding_count(&self) -> usize { + self.outstanding_job_count_rx.borrow().clone() } - fn pop(&mut self) -> Option { - self.queue - .iter_mut() - .next() - .and_then(|(_, mut entry)| entry.pop()) + fn update_queue(queue: &mut Vec, operation: IndexOperation) { + match operation { + IndexOperation::FlushQueue => { + for op in queue.pop() { + match op { + IndexOperation::IndexFile { payload, tx } => { + tx.try_send(payload); + } + IndexOperation::DeleteFile { payload, tx } => { + tx.try_send(payload); + } + _ => {} + } + } + } + _ => { + // TODO: This has to accomodate for duplicate files to index. + queue.push(operation); + } + } } fn db_id_for_worktree_id(&self, id: WorktreeId) -> Option { @@ -185,10 +210,16 @@ pub struct PendingFile { job_handle: JobHandle, } -#[derive(Clone)] enum IndexOperation { - IndexFile { file: PendingFile }, - DeleteFile { file: PendingFile }, + IndexFile { + payload: PendingFile, + tx: channel::Sender, + }, + DeleteFile { + payload: DbOperation, + tx: channel::Sender, + }, + FlushQueue, } pub struct SearchResult { @@ -621,6 +652,52 @@ impl SemanticIndex { }) } + // pub fn project_entries_changed( + // &self, + // project: ModelHandle, + // changes: &Arc<[(Arc, ProjectEntryId, PathChange)]>, + // cx: &ModelContext, + // worktree_id: &WorktreeId, + // ) -> Result<()> { + // let parsing_files_tx = self.parsing_files_tx.clone(); + // let db_update_tx = self.db_update_tx.clone(); + // let (job_queue_tx, outstanding_job_tx, worktree_db_id) = { + // let state = self.projects.get(&project.downgrade()); + // if state.is_none() { + // return anyhow::Error(anyhow!("Project not yet initialized")); + // } + // let state = state.unwrap(); + // ( + // state.job_queue_tx.clone(), + // state._outstanding_job_count_tx, + // state.db_id_for_worktree_id(worktree_id), + // ) + // }; + + // for (path, entry_id, path_change) in changes.iter() { + // match path_change { + // PathChange::AddedOrUpdated => { + // let job_handle = JobHandle::new(&outstanding_job_tx); + // job_queue_tx.try_send(IndexOperation::IndexFile { + // payload: PendingFile { + // worktree_db_id, + // relative_path: path, + // absolute_path, + // language, + // modified_time, + // job_handle, + // }, + // tx: parsing_files_tx, + // }) + // } + // PathChange::Removed => {} + // _ => {} + // } + // } + + // Ok(()) + // } + pub fn initialize_project( &mut self, project: ModelHandle, @@ -653,6 +730,7 @@ impl SemanticIndex { }); let language_registry = self.language_registry.clone(); + let parsing_files_tx = self.parsing_files_tx.clone(); cx.spawn(|this, mut cx| async move { futures::future::join_all(worktree_scans_complete).await; @@ -686,24 +764,13 @@ impl SemanticIndex { let (job_count_tx, job_count_rx) = watch::channel_with(0); let job_count_tx = Arc::new(Mutex::new(job_count_tx)); let job_count_tx_longlived = job_count_tx.clone(); - // this.update(&mut cx, |this, _| { - // let project_state = ProjectState::new( - // _subscription, - // worktree_db_ids, - // worktree_file_mtimes.clone(), - // job_count_rx.clone(), - // job_count_tx.clone(), - // ); - // this.projects.insert(project.downgrade(), project_state); - // }); let worktree_file_mtimes_all = worktree_file_mtimes.clone(); let worktree_files = cx .background() .spawn(async move { - let mut worktree_files = HashMap::new(); + let mut worktree_files = Vec::new(); for worktree in worktrees.into_iter() { - let mut candidate_files = Vec::new(); let mut file_mtimes = worktree_file_mtimes.remove(&worktree.id()).unwrap(); for file in worktree.files(false, 0) { let absolute_path = worktree.absolutize(&file.path); @@ -730,8 +797,8 @@ impl SemanticIndex { if !already_stored { let job_handle = JobHandle::new(&job_count_tx); - candidate_files.push(IndexOperation::IndexFile { - file: PendingFile { + worktree_files.push(IndexOperation::IndexFile { + payload: PendingFile { worktree_db_id: db_ids_by_worktree_id[&worktree.id()], relative_path: path_buf, absolute_path, @@ -739,12 +806,11 @@ impl SemanticIndex { job_handle, modified_time: file.mtime, }, + tx: parsing_files_tx.clone(), }); } } } - - worktree_files.insert(worktree.id(), candidate_files); } anyhow::Ok(worktree_files) @@ -753,6 +819,7 @@ impl SemanticIndex { this.update(&mut cx, |this, cx| { let project_state = ProjectState::new( + cx, _subscription, worktree_db_ids, worktree_file_mtimes_all, @@ -761,10 +828,8 @@ impl SemanticIndex { ); if let Some(project_state) = this.projects.get_mut(&project.downgrade()) { - for (worktree_id, index_operations) in &worktree_files { - for op in index_operations { - project_state.add_to_queue(*worktree_id, op.clone()); - } + for op in worktree_files { + project_state.job_queue_tx.try_send(op); } } @@ -791,134 +856,18 @@ impl SemanticIndex { let parsing_files_tx = self.parsing_files_tx.clone(); let db_update_tx = self.db_update_tx.clone(); let job_count_rx = state.outstanding_job_count_rx.clone(); - let count = state.queue.values().map(Vec::len).sum(); + let count = state.get_outstanding_count(); + cx.spawn(|this, mut cx| async move { this.update(&mut cx, |this, cx| { - let Some(mut state) = this.projects.get_mut(&project.downgrade()) else { + let Some(state) = this.projects.get_mut(&project.downgrade()) else { return; }; - let Some(mut index_operation) = state.pop() else { return;}; - let _ = match index_operation { - IndexOperation::IndexFile { file } => { - parsing_files_tx.try_send(file); - } - IndexOperation::DeleteFile { file } => { - db_update_tx.try_send(DbOperation::Delete { - worktree_id: file.worktree_db_id, - path: file.relative_path, - }); - } - }; - }); - }) - .detach(); + state.job_queue_tx.try_send(IndexOperation::FlushQueue); + }) + }); Task::Ready(Some(Ok((count, job_count_rx)))) - - // cx.spawn(|this, mut cx| async move { - // futures::future::join_all(worktree_scans_complete).await; - - // let worktree_db_ids = futures::future::join_all(worktree_db_ids).await; - - // let worktrees = project.read_with(&cx, |project, cx| { - // project - // .worktrees(cx) - // .map(|worktree| worktree.read(cx).snapshot()) - // .collect::>() - // }); - - // let mut worktree_file_mtimes = HashMap::new(); - // let mut db_ids_by_worktree_id = HashMap::new(); - // for (worktree, db_id) in worktrees.iter().zip(worktree_db_ids) { - // let db_id = db_id?; - // db_ids_by_worktree_id.insert(worktree.id(), db_id); - // worktree_file_mtimes.insert( - // worktree.id(), - // this.read_with(&cx, |this, _| this.get_file_mtimes(db_id)) - // .await?, - // ); - // } - - // let worktree_db_ids = db_ids_by_worktree_id - // .iter() - // .map(|(a, b)| (*a, *b)) - // .collect(); - - // let (job_count_tx, job_count_rx) = watch::channel_with(0); - // let job_count_tx = Arc::new(Mutex::new(job_count_tx)); - // this.update(&mut cx, |this, _| { - // let project_state = ProjectState::new( - // _subscription, - // worktree_db_ids, - // worktree_file_mtimes.clone(), - // job_count_rx.clone(), - // job_count_tx.clone(), - // ); - // this.projects.insert(project.downgrade(), project_state); - // }); - - // cx.background() - // .spawn(async move { - // let mut count = 0; - // for worktree in worktrees.into_iter() { - // let mut file_mtimes = worktree_file_mtimes.remove(&worktree.id()).unwrap(); - // for file in worktree.files(false, 0) { - // let absolute_path = worktree.absolutize(&file.path); - - // if let Ok(language) = language_registry - // .language_for_file(&absolute_path, None) - // .await - // { - // if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref()) - // && &language.name().as_ref() != &"Markdown" - // && language - // .grammar() - // .and_then(|grammar| grammar.embedding_config.as_ref()) - // .is_none() - // { - // continue; - // } - - // let path_buf = file.path.to_path_buf(); - // let stored_mtime = file_mtimes.remove(&file.path.to_path_buf()); - // let already_stored = stored_mtime - // .map_or(false, |existing_mtime| existing_mtime == file.mtime); - - // if !already_stored { - // count += 1; - - // let job_handle = JobHandle::new(&job_count_tx); - // parsing_files_tx - // .try_send(PendingFile { - // worktree_db_id: db_ids_by_worktree_id[&worktree.id()], - // relative_path: path_buf, - // absolute_path, - // language, - // job_handle, - // modified_time: file.mtime, - // }) - // .unwrap(); - // } - // } - // } - // for file in file_mtimes.keys() { - // db_update_tx - // .try_send(DbOperation::Delete { - // worktree_id: db_ids_by_worktree_id[&worktree.id()], - // path: file.to_owned(), - // }) - // .unwrap(); - // } - // } - - // log::trace!( - // "walking worktree took {:?} milliseconds", - // t0.elapsed().as_millis() - // ); - // anyhow::Ok((count, job_count_rx)) - // }) - // .await - // }) } pub fn outstanding_job_count_rx(