forked from mirrors/jj
merged_tree: read before/after trees concurrently
I'm going to rewrite `TreeDiffIterator` to fetch one level (depth) of the tree at a time and concurrently. One step towards that is to convert the iterator to a `Stream`. I'd like to do that by making the current `Iterator` implementation call the new `Stream` implementation. However, we can't call `futures::executor::block_on()` on a future that itself calls `futures::executor::block_on()` (as `Store::read_tree()` does), so the first step is to bubble up the async-ness a bit. This patch does that by fetching both sides of the diff concurrently. That should give close to a 2x speedup on high-latency backends. (It doesn't help with our backend at Google, however, because we have a daemon process that does some speculative prefetching that usually downloads the child trees anyway.)
This commit is contained in:
parent
815cf9bf07
commit
1b9a3e27e0
1 changed files with 18 additions and 6 deletions
|
@ -20,6 +20,7 @@ use std::iter::zip;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::{iter, vec};
|
use std::{iter, vec};
|
||||||
|
|
||||||
|
use futures::stream::StreamExt;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
|
|
||||||
use crate::backend::{BackendError, BackendResult, ConflictId, MergedTreeId, TreeId, TreeValue};
|
use crate::backend::{BackendError, BackendResult, ConflictId, MergedTreeId, TreeId, TreeValue};
|
||||||
|
@ -816,17 +817,25 @@ impl<'matcher> TreeDiffIterator<'matcher> {
|
||||||
Self { stack, matcher }
|
Self { stack, matcher }
|
||||||
}
|
}
|
||||||
|
|
||||||
fn single_tree(store: &Arc<Store>, dir: &RepoPath, value: Option<&TreeValue>) -> Tree {
|
async fn single_tree(store: &Arc<Store>, dir: &RepoPath, value: Option<&TreeValue>) -> Tree {
|
||||||
match value {
|
match value {
|
||||||
Some(TreeValue::Tree(tree_id)) => store.get_tree(dir, tree_id).unwrap(),
|
Some(TreeValue::Tree(tree_id)) => store.get_tree_async(dir, tree_id).await.unwrap(),
|
||||||
_ => Tree::null(store.clone(), dir.clone()),
|
_ => Tree::null(store.clone(), dir.clone()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Gets the given tree if `value` is a tree, otherwise an empty tree.
|
/// Gets the given tree if `value` is a tree, otherwise an empty tree.
|
||||||
fn tree(tree: &MergedTree, dir: &RepoPath, values: &Merge<Option<TreeValue>>) -> MergedTree {
|
async fn tree(
|
||||||
|
tree: &MergedTree,
|
||||||
|
dir: &RepoPath,
|
||||||
|
values: &Merge<Option<TreeValue>>,
|
||||||
|
) -> MergedTree {
|
||||||
let trees = if values.is_tree() {
|
let trees = if values.is_tree() {
|
||||||
values.map(|value| Self::single_tree(tree.store(), dir, value.as_ref()))
|
let builder: MergeBuilder<Tree> = futures::stream::iter(values.iter())
|
||||||
|
.then(|value| Self::single_tree(tree.store(), dir, value.as_ref()))
|
||||||
|
.collect()
|
||||||
|
.await;
|
||||||
|
builder.build()
|
||||||
} else {
|
} else {
|
||||||
Merge::resolved(Tree::null(tree.store().clone(), dir.clone()))
|
Merge::resolved(Tree::null(tree.store().clone(), dir.clone()))
|
||||||
};
|
};
|
||||||
|
@ -882,8 +891,11 @@ impl Iterator for TreeDiffIterator<'_> {
|
||||||
let tree_after = after.is_tree();
|
let tree_after = after.is_tree();
|
||||||
let post_subdir =
|
let post_subdir =
|
||||||
if (tree_before || tree_after) && !self.matcher.visit(&path).is_nothing() {
|
if (tree_before || tree_after) && !self.matcher.visit(&path).is_nothing() {
|
||||||
let before_tree = Self::tree(dir.tree1.as_ref(), &path, &before);
|
let (before_tree, after_tree) = futures::executor::block_on(async {
|
||||||
let after_tree = Self::tree(dir.tree2.as_ref(), &path, &after);
|
let before_tree = Self::tree(dir.tree1.as_ref(), &path, &before);
|
||||||
|
let after_tree = Self::tree(dir.tree2.as_ref(), &path, &after);
|
||||||
|
futures::join!(before_tree, after_tree)
|
||||||
|
});
|
||||||
let subdir = TreeDiffDirItem::new(path.clone(), before_tree, after_tree);
|
let subdir = TreeDiffDirItem::new(path.clone(), before_tree, after_tree);
|
||||||
self.stack.push(TreeDiffItem::Dir(subdir));
|
self.stack.push(TreeDiffItem::Dir(subdir));
|
||||||
self.stack.len() - 1
|
self.stack.len() - 1
|
||||||
|
|
Loading…
Reference in a new issue