From 1b9a3e27e07b449191f3887b8cbf9e12eb6b488e Mon Sep 17 00:00:00 2001 From: Martin von Zweigbergk Date: Mon, 25 Sep 2023 11:07:22 -0700 Subject: [PATCH] merged_tree: read before/after trees concurrently I'm going to rewrite `TreeDiffIterator` to fetch one level (depth) of the tree at a time and concurrently. One step towards that is to convert the iterator to a `Stream`. I'd like to do that by making the current `Iterator` implementation call the new `Stream` implementation. However, we can't call `futures::executor::block_on()` on a future that itself calls `futures::executor::block_on()` (as `Store::read_tree()` does), so the first step is to bubble up the async-ness a bit. This patch does that by fetching both sides of the diff concurrently. That should give close to a 2x speedup on high-latency backends. (It doesn't help with our backend at Google, however, because we have a daemon process that does some speculative prefetching that usually downloads the child trees anyway.) --- lib/src/merged_tree.rs | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/lib/src/merged_tree.rs b/lib/src/merged_tree.rs index 38b144e17..c650e9869 100644 --- a/lib/src/merged_tree.rs +++ b/lib/src/merged_tree.rs @@ -20,6 +20,7 @@ use std::iter::zip; use std::sync::Arc; use std::{iter, vec}; +use futures::stream::StreamExt; use itertools::Itertools; use crate::backend::{BackendError, BackendResult, ConflictId, MergedTreeId, TreeId, TreeValue}; @@ -816,17 +817,25 @@ impl<'matcher> TreeDiffIterator<'matcher> { Self { stack, matcher } } - fn single_tree(store: &Arc, dir: &RepoPath, value: Option<&TreeValue>) -> Tree { + async fn single_tree(store: &Arc, dir: &RepoPath, value: Option<&TreeValue>) -> Tree { match value { - Some(TreeValue::Tree(tree_id)) => store.get_tree(dir, tree_id).unwrap(), + Some(TreeValue::Tree(tree_id)) => store.get_tree_async(dir, tree_id).await.unwrap(), _ => Tree::null(store.clone(), dir.clone()), } } /// Gets the given tree if `value` is a tree, otherwise an empty tree. - fn tree(tree: &MergedTree, dir: &RepoPath, values: &Merge>) -> MergedTree { + async fn tree( + tree: &MergedTree, + dir: &RepoPath, + values: &Merge>, + ) -> MergedTree { let trees = if values.is_tree() { - values.map(|value| Self::single_tree(tree.store(), dir, value.as_ref())) + let builder: MergeBuilder = futures::stream::iter(values.iter()) + .then(|value| Self::single_tree(tree.store(), dir, value.as_ref())) + .collect() + .await; + builder.build() } else { Merge::resolved(Tree::null(tree.store().clone(), dir.clone())) }; @@ -882,8 +891,11 @@ impl Iterator for TreeDiffIterator<'_> { let tree_after = after.is_tree(); let post_subdir = if (tree_before || tree_after) && !self.matcher.visit(&path).is_nothing() { - let before_tree = Self::tree(dir.tree1.as_ref(), &path, &before); - let after_tree = Self::tree(dir.tree2.as_ref(), &path, &after); + let (before_tree, after_tree) = futures::executor::block_on(async { + let before_tree = Self::tree(dir.tree1.as_ref(), &path, &before); + let after_tree = Self::tree(dir.tree2.as_ref(), &path, &after); + futures::join!(before_tree, after_tree) + }); let subdir = TreeDiffDirItem::new(path.clone(), before_tree, after_tree); self.stack.push(TreeDiffItem::Dir(subdir)); self.stack.len() - 1