From a45c8c380f04f5f39bb360374007379ea1f96781 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Thu, 7 Sep 2023 15:25:23 +0200 Subject: [PATCH] :lipstick: --- crates/semantic_index/src/embedding_queue.rs | 81 ++++++++++---------- 1 file changed, 41 insertions(+), 40 deletions(-) diff --git a/crates/semantic_index/src/embedding_queue.rs b/crates/semantic_index/src/embedding_queue.rs index 104a4eb8ee..3026eef9ae 100644 --- a/crates/semantic_index/src/embedding_queue.rs +++ b/crates/semantic_index/src/embedding_queue.rs @@ -108,54 +108,55 @@ impl EmbeddingQueue { let finished_files_tx = self.finished_files_tx.clone(); let embedding_provider = self.embedding_provider.clone(); - self.executor.spawn(async move { - let mut spans = Vec::new(); - for fragment in &batch { - let file = fragment.file.lock(); - spans.extend( - { + self.executor + .spawn(async move { + let mut spans = Vec::new(); + for fragment in &batch { + let file = fragment.file.lock(); + spans.extend( file.spans[fragment.span_range.clone()] - .iter().filter(|d| d.embedding.is_none()) - .map(|d| d.content.clone()) - } - ); - } - - // If spans is 0, just send the fragment to the finished files if its the last one. - if spans.len() == 0 { - for fragment in batch.clone() { - if let Some(file) = Arc::into_inner(fragment.file) { - finished_files_tx.try_send(file.into_inner()).unwrap(); - } + .iter() + .filter(|d| d.embedding.is_none()) + .map(|d| d.content.clone()), + ); } - return; - }; - - match embedding_provider.embed_batch(spans).await { - Ok(embeddings) => { - let mut embeddings = embeddings.into_iter(); - for fragment in batch { - for span in - &mut fragment.file.lock().spans[fragment.span_range.clone()].iter_mut().filter(|d| d.embedding.is_none()) - { - if let Some(embedding) = embeddings.next() { - span.embedding = Some(embedding); - } else { - log::error!("number of embeddings returned different from number of documents"); - } - } + // If spans is 0, just send the fragment to the finished files if its the last one. + if spans.is_empty() { + for fragment in batch.clone() { if let Some(file) = Arc::into_inner(fragment.file) { finished_files_tx.try_send(file.into_inner()).unwrap(); } } + return; + }; + + match embedding_provider.embed_batch(spans).await { + Ok(embeddings) => { + let mut embeddings = embeddings.into_iter(); + for fragment in batch { + for span in &mut fragment.file.lock().spans[fragment.span_range.clone()] + .iter_mut() + .filter(|d| d.embedding.is_none()) + { + if let Some(embedding) = embeddings.next() { + span.embedding = Some(embedding); + } else { + log::error!("number of embeddings != number of documents"); + } + } + + if let Some(file) = Arc::into_inner(fragment.file) { + finished_files_tx.try_send(file.into_inner()).unwrap(); + } + } + } + Err(error) => { + log::error!("{:?}", error); + } } - Err(error) => { - log::error!("{:?}", error); - } - } - }) - .detach(); + }) + .detach(); } pub fn finished_files(&self) -> channel::Receiver {