From 1cae4758ccfdc2486da79082943e283fed4ec131 Mon Sep 17 00:00:00 2001 From: KCaverly Date: Mon, 21 Aug 2023 11:29:45 +0200 Subject: [PATCH 1/6] manage for edge case in which file documents are larger than the allowable limit --- crates/semantic_index/src/db.rs | 38 +++++++++-------- crates/semantic_index/src/semantic_index.rs | 47 +++++++++++++++++++-- 2 files changed, 63 insertions(+), 22 deletions(-) diff --git a/crates/semantic_index/src/db.rs b/crates/semantic_index/src/db.rs index e8c929c995..e57a5d733f 100644 --- a/crates/semantic_index/src/db.rs +++ b/crates/semantic_index/src/db.rs @@ -156,25 +156,27 @@ impl VectorDatabase { mtime: SystemTime, documents: Vec, ) -> Result<()> { - // Write to files table, and return generated id. - self.db.execute( - " - DELETE FROM files WHERE worktree_id = ?1 AND relative_path = ?2; - ", - params![worktree_id, path.to_str()], - )?; + // Return the existing ID, if both the file and mtime match let mtime = Timestamp::from(mtime); - self.db.execute( - " - INSERT INTO files - (worktree_id, relative_path, mtime_seconds, mtime_nanos) - VALUES - (?1, ?2, $3, $4); - ", - params![worktree_id, path.to_str(), mtime.seconds, mtime.nanos], - )?; - - let file_id = self.db.last_insert_rowid(); + let mut existing_id_query = self.db.prepare("SELECT id FROM files WHERE worktree_id = ?1 AND relative_path = ?2 AND mtime_seconds = ?3 AND mtime_nanos = ?4")?; + let existing_id = existing_id_query + .query_row( + params![worktree_id, path.to_str(), mtime.seconds, mtime.nanos], + |row| Ok(row.get::<_, i64>(0)?), + ) + .map_err(|err| anyhow!(err)); + let file_id = if existing_id.is_ok() { + // If already exists, just return the existing id + existing_id.unwrap() + } else { + // Delete Existing Row + self.db.execute( + "DELETE FROM files WHERE worktree_id = ?1 AND relative_path = ?2;", + params![worktree_id, path.to_str()], + )?; + self.db.execute("INSERT INTO files (worktree_id, relative_path, mtime_seconds, mtime_nanos) VALUES (?1, ?2, ?3, ?4);", params![worktree_id, path.to_str(), mtime.seconds, mtime.nanos])?; + self.db.last_insert_rowid() + }; // Currently inserting at approximately 3400 documents a second // I imagine we can speed this up with a bulk insert of some kind. diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index 8c9877b9d3..dd53215203 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -96,6 +96,7 @@ struct ProjectState { _outstanding_job_count_tx: Arc>>, } +#[derive(Clone)] struct JobHandle { tx: Weak>>, } @@ -389,6 +390,7 @@ impl SemanticIndex { embeddings_queue: &mut Vec<(i64, Vec, PathBuf, SystemTime, JobHandle)>, embed_batch_tx: &channel::Sender, PathBuf, SystemTime, JobHandle)>>, ) { + // Handle edge case where individual file has more documents than max batch size let should_flush = match job { EmbeddingJob::Enqueue { documents, @@ -397,9 +399,43 @@ impl SemanticIndex { mtime, job_handle, } => { - *queue_len += &documents.len(); - embeddings_queue.push((worktree_id, documents, path, mtime, job_handle)); - *queue_len >= EMBEDDINGS_BATCH_SIZE + // If documents is greater than embeddings batch size, recursively batch existing rows. + if &documents.len() > &EMBEDDINGS_BATCH_SIZE { + let first_job = EmbeddingJob::Enqueue { + documents: documents[..EMBEDDINGS_BATCH_SIZE].to_vec(), + worktree_id, + path: path.clone(), + mtime, + job_handle: job_handle.clone(), + }; + + Self::enqueue_documents_to_embed( + first_job, + queue_len, + embeddings_queue, + embed_batch_tx, + ); + + let second_job = EmbeddingJob::Enqueue { + documents: documents[EMBEDDINGS_BATCH_SIZE..].to_vec(), + worktree_id, + path: path.clone(), + mtime, + job_handle: job_handle.clone(), + }; + + Self::enqueue_documents_to_embed( + second_job, + queue_len, + embeddings_queue, + embed_batch_tx, + ); + return; + } else { + *queue_len += &documents.len(); + embeddings_queue.push((worktree_id, documents, path, mtime, job_handle)); + *queue_len >= EMBEDDINGS_BATCH_SIZE + } } EmbeddingJob::Flush => true, }; @@ -796,7 +832,10 @@ impl Drop for JobHandle { fn drop(&mut self) { if let Some(tx) = self.tx.upgrade() { let mut tx = tx.lock(); - *tx.borrow_mut() -= 1; + // Manage for overflow, cause we are cloning the Job Handle + if *tx.borrow() > 0 { + *tx.borrow_mut() -= 1; + }; } } } From def215af9f4cebbabf2eba1663844b4a201ea2ae Mon Sep 17 00:00:00 2001 From: KCaverly Date: Mon, 21 Aug 2023 12:47:43 +0200 Subject: [PATCH 2/6] update job handle to ensure file count is consistent Co-authored-by: Piotr --- crates/semantic_index/src/semantic_index.rs | 45 +++++++++++++++++---- 1 file changed, 38 insertions(+), 7 deletions(-) diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index dd53215203..7aea0f7cfe 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -98,9 +98,16 @@ struct ProjectState { #[derive(Clone)] struct JobHandle { - tx: Weak>>, + tx: Arc>>>, } +impl JobHandle { + fn new(tx: &Arc>>) -> Self { + Self { + tx: Arc::new(Arc::downgrade(&tx)), + } + } +} impl ProjectState { fn db_id_for_worktree_id(&self, id: WorktreeId) -> Option { self.worktree_db_ids @@ -651,7 +658,7 @@ impl SemanticIndex { count += 1; *job_count_tx.lock().borrow_mut() += 1; let job_handle = JobHandle { - tx: Arc::downgrade(&job_count_tx), + tx: Arc::new(Arc::downgrade(&job_count_tx)), }; parsing_files_tx .try_send(PendingFile { @@ -726,6 +733,7 @@ impl SemanticIndex { let database_url = self.database_url.clone(); let fs = self.fs.clone(); cx.spawn(|this, mut cx| async move { + let t0 = Instant::now(); let database = VectorDatabase::new(fs.clone(), database_url.clone()).await?; let phrase_embedding = embedding_provider @@ -735,6 +743,11 @@ impl SemanticIndex { .next() .unwrap(); + log::trace!( + "Embedding search phrase took: {:?} milliseconds", + t0.elapsed().as_millis() + ); + let file_ids = database.retrieve_included_file_ids(&worktree_db_ids, &includes, &excludes)?; @@ -809,6 +822,11 @@ impl SemanticIndex { let buffers = futures::future::join_all(tasks).await; + log::trace!( + "Semantic Searching took: {:?} milliseconds in total", + t0.elapsed().as_millis() + ); + Ok(buffers .into_iter() .zip(ranges) @@ -830,12 +848,25 @@ impl Entity for SemanticIndex { impl Drop for JobHandle { fn drop(&mut self) { - if let Some(tx) = self.tx.upgrade() { - let mut tx = tx.lock(); - // Manage for overflow, cause we are cloning the Job Handle - if *tx.borrow() > 0 { + if let Some(inner) = Arc::get_mut(&mut self.tx) { + if let Some(tx) = inner.upgrade() { + let mut tx = tx.lock(); *tx.borrow_mut() -= 1; - }; + } } } } + +#[cfg(test)] +mod tests { + + use super::*; + #[test] + fn test_job_handle() { + let (job_count_tx, job_count_rx) = watch::channel_with(0); + let tx = Arc::new(Mutex::new(job_count_tx)); + let job_handle = JobHandle::new(tx); + + assert_eq!(1, *job_count_rx.borrow_mut()); + } +} From 1a88444f2f52876eee4355a71cd7e6901850623b Mon Sep 17 00:00:00 2001 From: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> Date: Mon, 21 Aug 2023 13:00:56 +0200 Subject: [PATCH 3/6] Increment job counter on JobClient::new Co-authored-by: Kyle --- crates/semantic_index/src/semantic_index.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index 7aea0f7cfe..4457f55b7c 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -103,6 +103,7 @@ struct JobHandle { impl JobHandle { fn new(tx: &Arc>>) -> Self { + *tx.lock().borrow_mut() += 1; Self { tx: Arc::new(Arc::downgrade(&tx)), } @@ -656,10 +657,8 @@ impl SemanticIndex { if !already_stored { count += 1; - *job_count_tx.lock().borrow_mut() += 1; - let job_handle = JobHandle { - tx: Arc::new(Arc::downgrade(&job_count_tx)), - }; + + let job_handle = JobHandle::new(&job_count_tx); parsing_files_tx .try_send(PendingFile { worktree_db_id: db_ids_by_worktree_id[&worktree.id()], @@ -865,8 +864,14 @@ mod tests { fn test_job_handle() { let (job_count_tx, job_count_rx) = watch::channel_with(0); let tx = Arc::new(Mutex::new(job_count_tx)); - let job_handle = JobHandle::new(tx); + let job_handle = JobHandle::new(&tx); - assert_eq!(1, *job_count_rx.borrow_mut()); + assert_eq!(1, *job_count_rx.borrow()); + let new_job_handle = job_handle.clone(); + assert_eq!(1, *job_count_rx.borrow()); + drop(job_handle); + assert_eq!(1, *job_count_rx.borrow()); + drop(new_job_handle); + assert_eq!(0, *job_count_rx.borrow()); } } From 61041b0cd1ed354f4a04c72d077adcf0ce68e567 Mon Sep 17 00:00:00 2001 From: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> Date: Mon, 21 Aug 2023 13:23:11 +0200 Subject: [PATCH 4/6] Do not attempt to reindex a file if previous attempts have failed. Add doc comment to JobHandle Co-authored-by: Kyle --- crates/semantic_index/src/semantic_index.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index 4457f55b7c..2534988196 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -98,6 +98,8 @@ struct ProjectState { #[derive(Clone)] struct JobHandle { + /// The outer Arc is here to count the clones of a JobHandle instance; + /// when the last handle to a given job is dropped, we decrement a counter (just once). tx: Arc>>>, } @@ -389,6 +391,20 @@ impl SemanticIndex { .await .unwrap(); } + } else { + // Insert the file in spite of failure so that future attempts to index it do not take place (unless the file is changed). + for (worktree_id, documents, path, mtime, job_handle) in embeddings_queue.into_iter() { + db_update_tx + .send(DbOperation::InsertFile { + worktree_id, + documents: vec![], + path, + mtime, + job_handle, + }) + .await + .unwrap(); + } } } @@ -848,6 +864,7 @@ impl Entity for SemanticIndex { impl Drop for JobHandle { fn drop(&mut self) { if let Some(inner) = Arc::get_mut(&mut self.tx) { + // This is the last instance of the JobHandle (regardless of it's origin - whether it was cloned or not) if let Some(tx) = inner.upgrade() { let mut tx = tx.lock(); *tx.borrow_mut() -= 1; From 67a48ec1068e2ca1532180913178ae985a8b31f6 Mon Sep 17 00:00:00 2001 From: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> Date: Mon, 21 Aug 2023 13:30:32 +0200 Subject: [PATCH 5/6] project_search: use search history's current entry as a tab name. Previously the tab name for Semantic Search was not updated, as we didn't have an active query to go off of Co-authored-by: Kyle --- crates/search/src/project_search.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/crates/search/src/project_search.rs b/crates/search/src/project_search.rs index dada928d6e..196d5589f4 100644 --- a/crates/search/src/project_search.rs +++ b/crates/search/src/project_search.rs @@ -499,10 +499,14 @@ impl Item for ProjectSearchView { .with_margin_right(tab_theme.spacing), ) .with_child({ - let tab_name: Option> = - self.model.read(cx).active_query.as_ref().map(|query| { - let query_text = - util::truncate_and_trailoff(query.as_str(), MAX_TAB_TITLE_LEN); + let tab_name: Option> = self + .model + .read(cx) + .search_history + .current() + .as_ref() + .map(|query| { + let query_text = util::truncate_and_trailoff(query, MAX_TAB_TITLE_LEN); query_text.into() }); Label::new( From c68b518aecc119b36886096d49bfe04b96ced74a Mon Sep 17 00:00:00 2001 From: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> Date: Mon, 21 Aug 2023 13:31:45 +0200 Subject: [PATCH 6/6] chore: fix compiler warning Co-authored-by: Kyle --- crates/semantic_index/src/semantic_index.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index 2534988196..5aaecac733 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -393,7 +393,7 @@ impl SemanticIndex { } } else { // Insert the file in spite of failure so that future attempts to index it do not take place (unless the file is changed). - for (worktree_id, documents, path, mtime, job_handle) in embeddings_queue.into_iter() { + for (worktree_id, _, path, mtime, job_handle) in embeddings_queue.into_iter() { db_update_tx .send(DbOperation::InsertFile { worktree_id,