diff --git a/crates/search/src/project_search.rs b/crates/search/src/project_search.rs index dada928d6e..196d5589f4 100644 --- a/crates/search/src/project_search.rs +++ b/crates/search/src/project_search.rs @@ -499,10 +499,14 @@ impl Item for ProjectSearchView { .with_margin_right(tab_theme.spacing), ) .with_child({ - let tab_name: Option> = - self.model.read(cx).active_query.as_ref().map(|query| { - let query_text = - util::truncate_and_trailoff(query.as_str(), MAX_TAB_TITLE_LEN); + let tab_name: Option> = self + .model + .read(cx) + .search_history + .current() + .as_ref() + .map(|query| { + let query_text = util::truncate_and_trailoff(query, MAX_TAB_TITLE_LEN); query_text.into() }); Label::new( diff --git a/crates/semantic_index/src/db.rs b/crates/semantic_index/src/db.rs index e8c929c995..e57a5d733f 100644 --- a/crates/semantic_index/src/db.rs +++ b/crates/semantic_index/src/db.rs @@ -156,25 +156,27 @@ impl VectorDatabase { mtime: SystemTime, documents: Vec, ) -> Result<()> { - // Write to files table, and return generated id. - self.db.execute( - " - DELETE FROM files WHERE worktree_id = ?1 AND relative_path = ?2; - ", - params![worktree_id, path.to_str()], - )?; + // Return the existing ID, if both the file and mtime match let mtime = Timestamp::from(mtime); - self.db.execute( - " - INSERT INTO files - (worktree_id, relative_path, mtime_seconds, mtime_nanos) - VALUES - (?1, ?2, $3, $4); - ", - params![worktree_id, path.to_str(), mtime.seconds, mtime.nanos], - )?; - - let file_id = self.db.last_insert_rowid(); + let mut existing_id_query = self.db.prepare("SELECT id FROM files WHERE worktree_id = ?1 AND relative_path = ?2 AND mtime_seconds = ?3 AND mtime_nanos = ?4")?; + let existing_id = existing_id_query + .query_row( + params![worktree_id, path.to_str(), mtime.seconds, mtime.nanos], + |row| Ok(row.get::<_, i64>(0)?), + ) + .map_err(|err| anyhow!(err)); + let file_id = if existing_id.is_ok() { + // If already exists, just return the existing id + existing_id.unwrap() + } else { + // Delete Existing Row + self.db.execute( + "DELETE FROM files WHERE worktree_id = ?1 AND relative_path = ?2;", + params![worktree_id, path.to_str()], + )?; + self.db.execute("INSERT INTO files (worktree_id, relative_path, mtime_seconds, mtime_nanos) VALUES (?1, ?2, ?3, ?4);", params![worktree_id, path.to_str(), mtime.seconds, mtime.nanos])?; + self.db.last_insert_rowid() + }; // Currently inserting at approximately 3400 documents a second // I imagine we can speed this up with a bulk insert of some kind. diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index 8c9877b9d3..5aaecac733 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -96,10 +96,21 @@ struct ProjectState { _outstanding_job_count_tx: Arc>>, } +#[derive(Clone)] struct JobHandle { - tx: Weak>>, + /// The outer Arc is here to count the clones of a JobHandle instance; + /// when the last handle to a given job is dropped, we decrement a counter (just once). + tx: Arc>>>, } +impl JobHandle { + fn new(tx: &Arc>>) -> Self { + *tx.lock().borrow_mut() += 1; + Self { + tx: Arc::new(Arc::downgrade(&tx)), + } + } +} impl ProjectState { fn db_id_for_worktree_id(&self, id: WorktreeId) -> Option { self.worktree_db_ids @@ -380,6 +391,20 @@ impl SemanticIndex { .await .unwrap(); } + } else { + // Insert the file in spite of failure so that future attempts to index it do not take place (unless the file is changed). + for (worktree_id, _, path, mtime, job_handle) in embeddings_queue.into_iter() { + db_update_tx + .send(DbOperation::InsertFile { + worktree_id, + documents: vec![], + path, + mtime, + job_handle, + }) + .await + .unwrap(); + } } } @@ -389,6 +414,7 @@ impl SemanticIndex { embeddings_queue: &mut Vec<(i64, Vec, PathBuf, SystemTime, JobHandle)>, embed_batch_tx: &channel::Sender, PathBuf, SystemTime, JobHandle)>>, ) { + // Handle edge case where individual file has more documents than max batch size let should_flush = match job { EmbeddingJob::Enqueue { documents, @@ -397,9 +423,43 @@ impl SemanticIndex { mtime, job_handle, } => { - *queue_len += &documents.len(); - embeddings_queue.push((worktree_id, documents, path, mtime, job_handle)); - *queue_len >= EMBEDDINGS_BATCH_SIZE + // If documents is greater than embeddings batch size, recursively batch existing rows. + if &documents.len() > &EMBEDDINGS_BATCH_SIZE { + let first_job = EmbeddingJob::Enqueue { + documents: documents[..EMBEDDINGS_BATCH_SIZE].to_vec(), + worktree_id, + path: path.clone(), + mtime, + job_handle: job_handle.clone(), + }; + + Self::enqueue_documents_to_embed( + first_job, + queue_len, + embeddings_queue, + embed_batch_tx, + ); + + let second_job = EmbeddingJob::Enqueue { + documents: documents[EMBEDDINGS_BATCH_SIZE..].to_vec(), + worktree_id, + path: path.clone(), + mtime, + job_handle: job_handle.clone(), + }; + + Self::enqueue_documents_to_embed( + second_job, + queue_len, + embeddings_queue, + embed_batch_tx, + ); + return; + } else { + *queue_len += &documents.len(); + embeddings_queue.push((worktree_id, documents, path, mtime, job_handle)); + *queue_len >= EMBEDDINGS_BATCH_SIZE + } } EmbeddingJob::Flush => true, }; @@ -613,10 +673,8 @@ impl SemanticIndex { if !already_stored { count += 1; - *job_count_tx.lock().borrow_mut() += 1; - let job_handle = JobHandle { - tx: Arc::downgrade(&job_count_tx), - }; + + let job_handle = JobHandle::new(&job_count_tx); parsing_files_tx .try_send(PendingFile { worktree_db_id: db_ids_by_worktree_id[&worktree.id()], @@ -690,6 +748,7 @@ impl SemanticIndex { let database_url = self.database_url.clone(); let fs = self.fs.clone(); cx.spawn(|this, mut cx| async move { + let t0 = Instant::now(); let database = VectorDatabase::new(fs.clone(), database_url.clone()).await?; let phrase_embedding = embedding_provider @@ -699,6 +758,11 @@ impl SemanticIndex { .next() .unwrap(); + log::trace!( + "Embedding search phrase took: {:?} milliseconds", + t0.elapsed().as_millis() + ); + let file_ids = database.retrieve_included_file_ids(&worktree_db_ids, &includes, &excludes)?; @@ -773,6 +837,11 @@ impl SemanticIndex { let buffers = futures::future::join_all(tasks).await; + log::trace!( + "Semantic Searching took: {:?} milliseconds in total", + t0.elapsed().as_millis() + ); + Ok(buffers .into_iter() .zip(ranges) @@ -794,9 +863,32 @@ impl Entity for SemanticIndex { impl Drop for JobHandle { fn drop(&mut self) { - if let Some(tx) = self.tx.upgrade() { - let mut tx = tx.lock(); - *tx.borrow_mut() -= 1; + if let Some(inner) = Arc::get_mut(&mut self.tx) { + // This is the last instance of the JobHandle (regardless of it's origin - whether it was cloned or not) + if let Some(tx) = inner.upgrade() { + let mut tx = tx.lock(); + *tx.borrow_mut() -= 1; + } } } } + +#[cfg(test)] +mod tests { + + use super::*; + #[test] + fn test_job_handle() { + let (job_count_tx, job_count_rx) = watch::channel_with(0); + let tx = Arc::new(Mutex::new(job_count_tx)); + let job_handle = JobHandle::new(&tx); + + assert_eq!(1, *job_count_rx.borrow()); + let new_job_handle = job_handle.clone(); + assert_eq!(1, *job_count_rx.borrow()); + drop(job_handle); + assert_eq!(1, *job_count_rx.borrow()); + drop(new_job_handle); + assert_eq!(0, *job_count_rx.borrow()); + } +}