forked from mirrors/jj
merged_tree: drop outer loop in TreeDiffStreamImpl::poll_next()
As suggested by Yuya. I also added a comment and an assertion in the case where return `Poll::Pending`.
This commit is contained in:
parent
d989d4093d
commit
c77417d4e4
1 changed files with 49 additions and 49 deletions
|
@ -1176,59 +1176,59 @@ impl Stream for TreeDiffStreamImpl<'_> {
|
|||
type Item = (RepoPath, BackendResult<(MergedTreeValue, MergedTreeValue)>);
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
while !(self.items.is_empty() && self.pending_trees.is_empty()) {
|
||||
// Go through all pending tree futures and poll them.
|
||||
let mut pending_index = 0;
|
||||
while pending_index < self.pending_trees.len()
|
||||
&& (pending_index < self.max_concurrent_reads
|
||||
|| self.items.len() < self.max_queued_items)
|
||||
{
|
||||
let (_, future) = &mut self.pending_trees[pending_index];
|
||||
if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) {
|
||||
let (dir, _) = self.pending_trees.remove(pending_index).unwrap();
|
||||
let key = DiffStreamKey::normal(dir);
|
||||
// Whenever we add an entry to `self.pending_trees`, we also add an Ok() entry
|
||||
// to `self.items`.
|
||||
let (before, after) = self.items.remove(&key).unwrap().unwrap();
|
||||
// If this was a transition from file to tree or vice versa, add back an item
|
||||
// for just the removal/addition of the file.
|
||||
if before.is_present() && !before.is_tree() {
|
||||
self.items
|
||||
.insert(key.clone(), Ok((before, Merge::absent())));
|
||||
} else if after.is_present() && !after.is_tree() {
|
||||
self.items.insert(
|
||||
DiffStreamKey::file_after_dir(key.path.clone()),
|
||||
Ok((Merge::absent(), after)),
|
||||
);
|
||||
}
|
||||
self.add_dir_diff_items(key.path, tree_diff);
|
||||
} else {
|
||||
pending_index += 1;
|
||||
// Go through all pending tree futures and poll them.
|
||||
let mut pending_index = 0;
|
||||
while pending_index < self.pending_trees.len()
|
||||
&& (pending_index < self.max_concurrent_reads
|
||||
|| self.items.len() < self.max_queued_items)
|
||||
{
|
||||
let (_, future) = &mut self.pending_trees[pending_index];
|
||||
if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) {
|
||||
let (dir, _) = self.pending_trees.remove(pending_index).unwrap();
|
||||
let key = DiffStreamKey::normal(dir);
|
||||
// Whenever we add an entry to `self.pending_trees`, we also add an Ok() entry
|
||||
// to `self.items`.
|
||||
let (before, after) = self.items.remove(&key).unwrap().unwrap();
|
||||
// If this was a transition from file to tree or vice versa, add back an item
|
||||
// for just the removal/addition of the file.
|
||||
if before.is_present() && !before.is_tree() {
|
||||
self.items
|
||||
.insert(key.clone(), Ok((before, Merge::absent())));
|
||||
} else if after.is_present() && !after.is_tree() {
|
||||
self.items.insert(
|
||||
DiffStreamKey::file_after_dir(key.path.clone()),
|
||||
Ok((Merge::absent(), after)),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Now emit the first file, or the first tree that completed with an error
|
||||
while let Some(entry) = self.items.first_entry() {
|
||||
match entry.get() {
|
||||
Err(_) => {
|
||||
// File or tree with error
|
||||
let (key, result) = entry.remove_entry();
|
||||
return Poll::Ready(Some((key.path, result)));
|
||||
}
|
||||
Ok((before, after)) if !before.is_tree() && !after.is_tree() => {
|
||||
let (key, result) = entry.remove_entry();
|
||||
return Poll::Ready(Some((key.path, result)));
|
||||
}
|
||||
_ => {
|
||||
if !self.pending_trees.is_empty() {
|
||||
return Poll::Pending;
|
||||
}
|
||||
}
|
||||
};
|
||||
self.add_dir_diff_items(key.path, tree_diff);
|
||||
} else {
|
||||
pending_index += 1;
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Ready(None)
|
||||
// Now emit the first file, or the first tree that completed with an error
|
||||
if let Some(entry) = self.items.first_entry() {
|
||||
match entry.get() {
|
||||
Err(_) => {
|
||||
// File or tree with error
|
||||
let (key, result) = entry.remove_entry();
|
||||
Poll::Ready(Some((key.path, result)))
|
||||
}
|
||||
Ok((before, after)) if !before.is_tree() && !after.is_tree() => {
|
||||
// A diff with no trees involved
|
||||
let (key, result) = entry.remove_entry();
|
||||
Poll::Ready(Some((key.path, result)))
|
||||
}
|
||||
_ => {
|
||||
// The first entry has a tree on at least one side (before or after). We need to
|
||||
// wait for that future to complete.
|
||||
assert!(!self.pending_trees.is_empty());
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Poll::Ready(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue