merged_tree: add a Stream for concurrent diff off trees

When diffing two trees, we currently start at the root and diff those
trees. Then we diff each subtree, one at a time, recursively. When
using a commit backend that uses remote storage, like our backend at
Google does, diffing the subtrees one at a time gets very slow. We
should be able to diff subtrees concurrently. That way, the number of
roundtrips to a server becomes determined by the depth of the deepest
difference instead of by the number of differing trees (times 2,
even). This patch implements such an algorithm behind a `Stream`
interface. It's not hooked in to `MergedTree::diff_stream()` yet; that
will happen in the next commit.

I timed the new implementation by updating `jj diff -s` to use the new
diff stream and then ran it on the Linux repo with `jj diff
--ignore-working-copy -s --from v5.0 --to v6.0`. That slowed down by
~20%, from ~750 ms to ~900 ms. Maybe we can get some of that
performance back but I think it'll be hard to match
`MergedTree::diff()`. We can decide later if we're okay with the
difference (after hopefully reducing the gap a bit) or if we want to
keep both implementations.

I also timed the new implementation on our cloud-based repo at
Google. As expected, it made some diffs much faster (I'm not sure if
I'm allowed to share figures).
This commit is contained in:
Martin von Zweigbergk 2023-10-19 11:27:55 -07:00 committed by Martin von Zweigbergk
parent 9af09ec236
commit f40adb84fc
2 changed files with 287 additions and 10 deletions

View file

@ -14,15 +14,16 @@
//! A lazily merged view of a set of trees.
use std::cmp::max;
use std::collections::BTreeMap;
use std::cmp::{max, Ordering};
use std::collections::{BTreeMap, VecDeque};
use std::iter::zip;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::{iter, vec};
use futures::stream::StreamExt;
use futures::{Stream, TryStreamExt};
use futures::{Future, Stream, TryStreamExt};
use itertools::Itertools;
use pollster::FutureExt;
@ -836,7 +837,9 @@ enum TreeDiffItem {
}
impl<'matcher> TreeDiffIterator<'matcher> {
fn new(tree1: MergedTree, tree2: MergedTree, matcher: &'matcher dyn Matcher) -> Self {
/// Creates a iterator over the differences between two trees. Generally
/// prefer `MergedTree::diff()` of calling this directly.
pub fn new(tree1: MergedTree, tree2: MergedTree, matcher: &'matcher dyn Matcher) -> Self {
let root_dir = RepoPath::root();
let mut stack = Vec::new();
if !matcher.visit(&root_dir).is_nothing() {
@ -970,6 +973,251 @@ impl Iterator for TreeDiffIterator<'_> {
}
}
/// Stream of differences between two trees.
pub struct TreeDiffStreamImpl<'matcher> {
matcher: &'matcher dyn Matcher,
legacy_format_before: bool,
legacy_format_after: bool,
/// Pairs of tree values that may or may not be ready to emit, sorted in the
/// order we want to emit them. If either side is a tree, there will be
/// a corresponding entry in `pending_trees`.
items: BTreeMap<DiffStreamKey, BackendResult<(MergedTreeValue, MergedTreeValue)>>,
// TODO: Is it better to combine this and `items` into a single map?
#[allow(clippy::type_complexity)]
pending_trees: VecDeque<(
RepoPath,
Pin<Box<dyn Future<Output = BackendResult<(MergedTree, MergedTree)>> + 'matcher>>,
)>,
/// The maximum number of trees to request concurrently. However, we do the
/// accounting per path, so for there will often be twice as many pending
/// `Backend::read_tree()` calls - for the "before" and "after" sides. For
/// conflicts, there will be even more.
max_concurrent_reads: usize,
/// The maximum number of items in `items`. However, we will always add the
/// full differences from a particular pair of trees, so it may temporarily
/// go over the limit (until we emit those items). It may also go over the
/// limit because we have a file item that's blocked by pending subdirectory
/// items.
max_queued_items: usize,
}
/// A wrapper around `RepoPath` that allows us to optionally sort files after
/// directories that have the file as a prefix.
#[derive(PartialEq, Eq, Clone, Debug)]
struct DiffStreamKey {
path: RepoPath,
file_after_dir: bool,
}
impl DiffStreamKey {
fn normal(path: RepoPath) -> Self {
DiffStreamKey {
path,
file_after_dir: false,
}
}
fn file_after_dir(path: RepoPath) -> Self {
DiffStreamKey {
path,
file_after_dir: true,
}
}
}
impl PartialOrd for DiffStreamKey {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for DiffStreamKey {
fn cmp(&self, other: &Self) -> Ordering {
if self == other {
Ordering::Equal
} else if self.file_after_dir && self.path.contains(&other.path) {
Ordering::Greater
} else if other.file_after_dir && other.path.contains(&self.path) {
Ordering::Less
} else {
self.path.cmp(&other.path)
}
}
}
impl<'matcher> TreeDiffStreamImpl<'matcher> {
/// Creates a iterator over the differences between two trees. Generally
/// prefer `MergedTree::diff_stream()` of calling this directly.
pub fn new(tree1: MergedTree, tree2: MergedTree, matcher: &'matcher dyn Matcher) -> Self {
let mut stream = Self {
matcher,
legacy_format_before: matches!(tree1, MergedTree::Legacy(_)),
legacy_format_after: matches!(tree2, MergedTree::Legacy(_)),
items: BTreeMap::new(),
pending_trees: VecDeque::new(),
// TODO: maybe the backends can suggest the conurrency limit?
max_concurrent_reads: 100,
max_queued_items: 10000,
};
stream.add_dir_diff_items(RepoPath::root(), Ok((tree1, tree2)));
stream
}
/// Gets the given tree if `value` is a tree, otherwise an empty tree.
async fn tree(
store: Arc<Store>,
legacy_format: bool,
dir: RepoPath,
values: MergedTreeValue,
) -> BackendResult<MergedTree> {
let trees = if values.is_tree() {
let builder: MergeBuilder<Tree> = futures::stream::iter(values.iter())
.then(|value| TreeDiffIterator::single_tree(&store, &dir, value.as_ref()))
.try_collect()
.await?;
builder.build()
} else {
Merge::resolved(Tree::null(store, dir.clone()))
};
// Maintain the type of tree, so we resolve `TreeValue::Conflict` as necessary
// in the subtree
if legacy_format {
Ok(MergedTree::Legacy(trees.into_resolved().unwrap()))
} else {
Ok(MergedTree::Merge(trees))
}
}
fn add_dir_diff_items(
&mut self,
dir: RepoPath,
tree_diff: BackendResult<(MergedTree, MergedTree)>,
) {
let (tree1, tree2) = match tree_diff {
Ok(trees) => trees,
Err(err) => {
self.items.insert(DiffStreamKey::normal(dir), Err(err));
return;
}
};
for basename in merged_tree_basenames(&tree1, &tree2) {
let value_before = tree1.value(basename);
let value_after = tree2.value(basename);
if value_after != value_before {
let path = dir.join(basename);
let before = value_before.to_merge();
let after = value_after.to_merge();
let tree_before = before.is_tree();
let tree_after = after.is_tree();
// Check if trees and files match, but only if either side is a tree or a file
// (don't query the matcher unnecessarily).
let tree_matches =
(tree_before || tree_after) && !self.matcher.visit(&path).is_nothing();
let file_matches = (!tree_before || !tree_after) && self.matcher.matches(&path);
// Replace trees or files that don't match by `Merge::absent()`
let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
before
} else {
Merge::absent()
};
let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
after
} else {
Merge::absent()
};
if before.is_absent() && after.is_absent() {
continue;
}
// If the path was a tree on either side of the diff, read those trees.
if tree_matches {
let before_tree_future = Self::tree(
tree1.store().clone(),
self.legacy_format_before,
path.clone(),
before.clone(),
);
let after_tree_future = Self::tree(
tree2.store().clone(),
self.legacy_format_after,
path.clone(),
after.clone(),
);
let both_trees_future =
async { futures::try_join!(before_tree_future, after_tree_future) };
self.pending_trees
.push_back((path.clone(), Box::pin(both_trees_future)));
}
self.items
.insert(DiffStreamKey::normal(path), Ok((before, after)));
}
}
}
}
impl Stream for TreeDiffStreamImpl<'_> {
type Item = (RepoPath, BackendResult<(MergedTreeValue, MergedTreeValue)>);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
while !(self.items.is_empty() && self.pending_trees.is_empty()) {
// Go through all pending tree futures and poll them.
let mut pending_index = 0;
while pending_index < self.pending_trees.len()
&& (pending_index < self.max_concurrent_reads
|| self.items.len() < self.max_queued_items)
{
let (_, future) = &mut self.pending_trees[pending_index];
if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) {
let (dir, _) = self.pending_trees.remove(pending_index).unwrap();
let key = DiffStreamKey::normal(dir);
// Whenever we add an entry to `self.pending_trees`, we also add an Ok() entry
// to `self.items`.
let (before, after) = self.items.remove(&key).unwrap().unwrap();
// If this was a transition from file to tree or vice versa, add back an item
// for just the removal/addition of the file.
if before.is_present() && !before.is_tree() {
self.items
.insert(key.clone(), Ok((before, Merge::absent())));
} else if after.is_present() && !after.is_tree() {
self.items.insert(
DiffStreamKey::file_after_dir(key.path.clone()),
Ok((Merge::absent(), after)),
);
}
self.add_dir_diff_items(key.path, tree_diff);
} else {
pending_index += 1;
}
}
// Now emit the first file, or the first tree that completed with an error
while let Some(entry) = self.items.first_entry() {
match entry.get() {
Err(_) => {
// File or tree with error
let (key, result) = entry.remove_entry();
return Poll::Ready(Some((key.path, result)));
}
Ok((before, after)) if !before.is_tree() && !after.is_tree() => {
let (key, result) = entry.remove_entry();
return Poll::Ready(Some((key.path, result)));
}
_ => {
if !self.pending_trees.is_empty() {
return Poll::Pending;
}
}
};
}
}
Poll::Ready(None)
}
}
/// Helps with writing trees with conflicts. You start by creating an instance
/// of this type with one or more base trees. You then add overrides on top. The
/// overrides may be conflicts. Then you can write the result as a legacy tree

View file

@ -12,12 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use futures::executor::block_on;
use futures::StreamExt;
use itertools::Itertools;
use jj_lib::backend::{FileId, MergedTreeId, TreeValue};
use jj_lib::files::MergeResult;
use jj_lib::matchers::{EverythingMatcher, FilesMatcher, PrefixMatcher};
use jj_lib::matchers::{EverythingMatcher, FilesMatcher, Matcher, PrefixMatcher};
use jj_lib::merge::{Merge, MergeBuilder};
use jj_lib::merged_tree::{MergedTree, MergedTreeBuilder, MergedTreeVal};
use jj_lib::merged_tree::{
MergedTree, MergedTreeBuilder, MergedTreeVal, TreeDiffIterator, TreeDiffStreamImpl,
};
use jj_lib::repo::Repo;
use jj_lib::repo_path::{RepoPath, RepoPathComponent, RepoPathJoin};
use jj_lib::tree::merge_trees;
@ -31,6 +35,18 @@ fn file_value(file_id: &FileId) -> TreeValue {
}
}
fn diff_stream_equals_iter(tree1: &MergedTree, tree2: &MergedTree, matcher: &dyn Matcher) {
let iter_diff: Vec<_> = TreeDiffIterator::new(tree1.clone(), tree2.clone(), matcher)
.map(|(path, diff)| (path, diff.unwrap()))
.collect();
let stream_diff: Vec<_> = block_on(
TreeDiffStreamImpl::new(tree1.clone(), tree2.clone(), matcher)
.map(|(path, diff)| (path, diff.unwrap()))
.collect(),
);
assert_eq!(stream_diff, iter_diff);
}
#[test]
fn test_from_legacy_tree() {
let test_repo = TestRepo::init();
@ -714,6 +730,7 @@ fn test_diff_resolved() {
),
)
);
diff_stream_equals_iter(&before_merged, &after_merged, &EverythingMatcher);
}
/// Diff two conflicted trees
@ -805,6 +822,7 @@ fn test_diff_conflicted() {
})
.collect_vec();
assert_eq!(actual_diff, expected_diff);
diff_stream_equals_iter(&left_merged, &right_merged, &EverythingMatcher);
// Test the reverse diff
let actual_diff = right_merged
.diff(&left_merged, &EverythingMatcher)
@ -820,6 +838,7 @@ fn test_diff_conflicted() {
})
.collect_vec();
assert_eq!(actual_diff, expected_diff);
diff_stream_equals_iter(&right_merged, &left_merged, &EverythingMatcher);
}
#[test]
@ -972,6 +991,7 @@ fn test_diff_dir_file() {
),
];
assert_eq!(actual_diff, expected_diff);
diff_stream_equals_iter(&left_merged, &right_merged, &EverythingMatcher);
}
// Test the reverse diff
@ -1036,12 +1056,14 @@ fn test_diff_dir_file() {
),
];
assert_eq!(actual_diff, expected_diff);
diff_stream_equals_iter(&right_merged, &left_merged, &EverythingMatcher);
}
// Diff while filtering by `path1` (file1 -> directory1) as a file
{
let matcher = FilesMatcher::new(&[path1.clone()]);
let actual_diff = left_merged
.diff(&right_merged, &FilesMatcher::new(&[path1.clone()]))
.diff(&right_merged, &matcher)
.map(|(path, diff)| (path, diff.unwrap()))
.collect_vec();
let expected_diff = vec![
@ -1052,12 +1074,14 @@ fn test_diff_dir_file() {
),
];
assert_eq!(actual_diff, expected_diff);
diff_stream_equals_iter(&left_merged, &right_merged, &matcher);
}
// Diff while filtering by `path1/file` (file1 -> directory1) as a file
{
let matcher = FilesMatcher::new(&[path1.join(&file)]);
let actual_diff = left_merged
.diff(&right_merged, &FilesMatcher::new(&[path1.join(&file)]))
.diff(&right_merged, &matcher)
.map(|(path, diff)| (path, diff.unwrap()))
.collect_vec();
let expected_diff = vec![
@ -1068,12 +1092,14 @@ fn test_diff_dir_file() {
),
];
assert_eq!(actual_diff, expected_diff);
diff_stream_equals_iter(&left_merged, &right_merged, &matcher);
}
// Diff while filtering by `path1` (file1 -> directory1) as a prefix
{
let matcher = PrefixMatcher::new(&[path1.clone()]);
let actual_diff = left_merged
.diff(&right_merged, &PrefixMatcher::new(&[path1.clone()]))
.diff(&right_merged, &matcher)
.map(|(path, diff)| (path, diff.unwrap()))
.collect_vec();
let expected_diff = vec![
@ -1087,6 +1113,7 @@ fn test_diff_dir_file() {
),
];
assert_eq!(actual_diff, expected_diff);
diff_stream_equals_iter(&left_merged, &right_merged, &matcher);
}
// Diff while filtering by `path6` (directory1 -> file1+(directory1-absent)) as
@ -1094,8 +1121,9 @@ fn test_diff_dir_file() {
// do see the directory that's included in the conflict with a file on the right
// side.
{
let matcher = FilesMatcher::new(&[path6.clone()]);
let actual_diff = left_merged
.diff(&right_merged, &FilesMatcher::new(&[path6.clone()]))
.diff(&right_merged, &matcher)
.map(|(path, diff)| (path, diff.unwrap()))
.collect_vec();
let expected_diff = vec![(
@ -1103,6 +1131,7 @@ fn test_diff_dir_file() {
(Merge::absent(), right_merged.path_value(&path6)),
)];
assert_eq!(actual_diff, expected_diff);
diff_stream_equals_iter(&left_merged, &right_merged, &matcher);
}
}