fix: remove heap

This commit is contained in:
leeeon233 2022-12-29 12:28:24 +08:00 committed by Leonzhao
parent 524916239a
commit e3a93be6a2

View file

@ -3,7 +3,7 @@ use num::Zero;
use super::DagUtils;
use std::{
collections::BTreeMap,
ops::{Range, RangeBounds},
ops::{Add, Range, RangeBounds},
};
use crate::version::IdSpanVector;
@ -198,7 +198,6 @@ pub(crate) struct DagCausalIter<'a, Dag> {
in_degrees: FxHashMap<ID, usize>,
succ: BTreeMap<ID, SmallVec<[ID; 2]>>,
stack: Vec<ID>,
heap: BinaryHeap<(bool, ID)>,
}
#[derive(Debug)]
@ -213,42 +212,36 @@ pub(crate) struct IterReturn<'a, T> {
impl<'a, T: DagNode, D: Dag<Node = T>> DagCausalIter<'a, D> {
pub fn new(dag: &'a D, from: SmallVec<[ID; 2]>, target: IdSpanVector) -> Self {
// make dag
let mut in_degrees = FxHashMap::default();
let mut in_degrees: FxHashMap<ID, usize> = FxHashMap::default();
let mut succ = BTreeMap::default();
let mut stack = Vec::new();
let mut heap = BinaryHeap::default();
let mut q = vec![];
for id in target.iter() {
if id.1.content_len() > 0 {
let id = id.id_start();
let node = dag.get(id).unwrap();
let diff = id.counter - node.id_start().counter;
heap.push(IdHeapItem {
id,
lamport: node.lamport() + diff as Lamport,
});
q.push(id);
}
}
while let Some(id) = heap.pop() {
q.push(id.id)
}
// traverse all nodes
while let Some(id) = q.pop() {
let client = id.client_id;
let node = dag.get(id).unwrap();
let deps = node.deps();
if id.counter == target.get(&client).unwrap().min() {
// right after the `from` node can be appended causally
// target start nodes maybe have deps relation so we use Lamport.
stack.push(id);
} else {
in_degrees.insert(id, deps.len());
if deps.len().is_zero() {
in_degrees.insert(id, 0);
}
for dep in node.deps().iter() {
for dep in deps.iter() {
let filter = if let Some(span) = target.get(&dep.client_id) {
dep.counter < span.min()
} else {
true
};
if filter {
in_degrees.entry(id).or_insert(0);
} else {
in_degrees.entry(id).and_modify(|i| *i += 1).or_insert(1);
}
succ.entry(*dep).or_insert_with(SmallVec::new).push(id);
}
let mut target_span = *target.get(&client).unwrap();
@ -259,6 +252,14 @@ impl<'a, T: DagNode, D: Dag<Node = T>> DagCausalIter<'a, D> {
}
}
in_degrees.retain(|k, v| {
if v.is_zero() {
stack.push(*k);
return false;
}
true
});
Self {
dag,
frontier: from,
@ -266,7 +267,6 @@ impl<'a, T: DagNode, D: Dag<Node = T>> DagCausalIter<'a, D> {
in_degrees,
succ,
stack,
heap: Default::default(),
}
}
}
@ -331,6 +331,7 @@ impl<'a, T: DagNode + 'a, D: Dag<Node = T>> Iterator for DagCausalIter<'a, D> {
let current_client = node_id.client_id;
let mut keys = Vec::new();
let mut heap = BinaryHeap::new();
// The in-degree of the successor node minus 1, and if it becomes 0, it is added to the heap
for (key, succ) in self.succ.range((node.id_start(), node.id_end())) {
keys.push(*key);
@ -338,8 +339,7 @@ impl<'a, T: DagNode + 'a, D: Dag<Node = T>> Iterator for DagCausalIter<'a, D> {
self.in_degrees.entry(*succ_id).and_modify(|i| *i -= 1);
if let Some(in_degree) = self.in_degrees.get(succ_id) {
if in_degree.is_zero() {
self.heap
.push((succ_id.client_id == current_client, *succ_id));
heap.push((succ_id.client_id != current_client, *succ_id));
self.in_degrees.remove(succ_id);
}
}
@ -349,7 +349,7 @@ impl<'a, T: DagNode + 'a, D: Dag<Node = T>> Iterator for DagCausalIter<'a, D> {
keys.into_iter().for_each(|k| {
self.succ.remove(&k);
});
while let Some(id) = self.heap.pop() {
while let Some(id) = heap.pop() {
self.stack.push(id.1)
}
@ -364,7 +364,6 @@ impl<'a, T: DagNode + 'a, D: Dag<Node = T>> Iterator for DagCausalIter<'a, D> {
#[cfg(test)]
mod test {
use crate::{
change::ChangeMergeCfg,
configure::Configure,
@ -498,11 +497,11 @@ mod test {
.unwrap();
let mut from_vv = VersionVector::new();
from_vv.set_end(ID {
from_vv.set_last(ID {
client_id: 1,
counter: 1,
});
from_vv.set_end(ID {
from_vv.set_last(ID {
client_id: 2,
counter: 1,
});