mirror of
https://github.com/loro-dev/loro.git
synced 2025-02-10 22:39:29 +00:00
fix: lock issues
This commit is contained in:
parent
a134bf88e3
commit
a7816af173
1 changed files with 22 additions and 18 deletions
|
@ -133,7 +133,7 @@ impl AppDag {
|
|||
}
|
||||
});
|
||||
|
||||
let mut map = self.map.lock().unwrap();
|
||||
let mut map = self.map.try_lock().unwrap();
|
||||
if !pushed {
|
||||
map.insert(node.id_start(), node);
|
||||
}
|
||||
|
@ -143,7 +143,7 @@ impl AppDag {
|
|||
continue;
|
||||
}
|
||||
|
||||
let ans = self.with_node_mut(*dep, |target| {
|
||||
let ans = self.with_node_mut(&mut map, *dep, |target| {
|
||||
// We don't need to break
|
||||
let target = target?;
|
||||
if target.ctr_last() == dep.counter {
|
||||
|
@ -163,7 +163,7 @@ impl AppDag {
|
|||
if let Some(new_node) = ans {
|
||||
map.insert(new_node.id_start(), new_node);
|
||||
} else {
|
||||
self.unhandled_dep_points.lock().unwrap().insert(*dep);
|
||||
self.unhandled_dep_points.try_lock().unwrap().insert(*dep);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -173,9 +173,13 @@ impl AppDag {
|
|||
}
|
||||
}
|
||||
|
||||
fn with_node_mut<R>(&self, id: ID, f: impl FnOnce(Option<&mut AppDagNode>) -> R) -> R {
|
||||
fn with_node_mut<R>(
|
||||
&self,
|
||||
map: &mut BTreeMap<ID, AppDagNode>,
|
||||
id: ID,
|
||||
f: impl FnOnce(Option<&mut AppDagNode>) -> R,
|
||||
) -> R {
|
||||
self.ensure_lazy_load_node(id);
|
||||
let mut map = self.map.lock().unwrap();
|
||||
let x = map.range_mut(..=id).next_back();
|
||||
if let Some((_, node)) = x {
|
||||
if node.contains_id(id) {
|
||||
|
@ -221,7 +225,7 @@ impl AppDag {
|
|||
f: impl FnOnce(Option<&mut AppDagNode>) -> R,
|
||||
) -> R {
|
||||
self.lazy_load_last_of_peer(peer);
|
||||
let mut binding = self.map.lock().unwrap();
|
||||
let mut binding = self.map.try_lock().unwrap();
|
||||
let last = binding
|
||||
.range_mut(..=ID::new(peer, Counter::MAX))
|
||||
.next_back()
|
||||
|
@ -235,7 +239,7 @@ impl AppDag {
|
|||
}
|
||||
|
||||
pub(super) fn lazy_load_last_of_peer(&mut self, peer: u64) {
|
||||
let unparsed_vv = self.unparsed_vv.lock().unwrap();
|
||||
let unparsed_vv = self.unparsed_vv.try_lock().unwrap();
|
||||
if !unparsed_vv.contains_key(&peer) || self.vv[&peer] >= unparsed_vv[&peer] {
|
||||
return;
|
||||
}
|
||||
|
@ -251,7 +255,7 @@ impl AppDag {
|
|||
assert!(!nodes.is_empty());
|
||||
let mut map = self.map.try_lock().unwrap();
|
||||
let new_dag_start_counter_for_the_peer = nodes[0].cnt;
|
||||
let mut unparsed_vv = self.unparsed_vv.lock().unwrap();
|
||||
let mut unparsed_vv = self.unparsed_vv.try_lock().unwrap();
|
||||
let end_counter = unparsed_vv[&peer];
|
||||
let mut deps_on_others = vec![];
|
||||
for mut node in nodes {
|
||||
|
@ -274,7 +278,7 @@ impl AppDag {
|
|||
// PERF: we can try to merge the node with the previous node
|
||||
let break_points: Vec<_> = self
|
||||
.unhandled_dep_points
|
||||
.lock()
|
||||
.try_lock()
|
||||
.unwrap()
|
||||
.range(node.id_start()..node.id_end())
|
||||
.map(|id| (id.counter - node.cnt) as usize)
|
||||
|
@ -300,13 +304,13 @@ impl AppDag {
|
|||
}
|
||||
|
||||
fn handle_deps_break_points(&self, ids: &[ID], skip_peer: PeerID) {
|
||||
let mut map = self.map.lock().unwrap();
|
||||
let mut map = self.map.try_lock().unwrap();
|
||||
for &id in ids.iter() {
|
||||
if id.peer == skip_peer {
|
||||
continue;
|
||||
}
|
||||
|
||||
let ans = self.with_node_mut(id, |target| {
|
||||
let ans = self.with_node_mut(&mut map, id, |target| {
|
||||
// We don't need to break the dag node if it's not loaded yet
|
||||
let target = target?;
|
||||
if target.ctr_last() == id.counter {
|
||||
|
@ -326,13 +330,13 @@ impl AppDag {
|
|||
if let Some(new_node) = ans {
|
||||
map.insert(new_node.id_start(), new_node);
|
||||
} else {
|
||||
self.unhandled_dep_points.lock().unwrap().insert(id);
|
||||
self.unhandled_dep_points.try_lock().unwrap().insert(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn ensure_lazy_load_node(&self, id: ID) {
|
||||
if !self.unparsed_vv.lock().unwrap().includes_id(id) {
|
||||
if !self.unparsed_vv.try_lock().unwrap().includes_id(id) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -346,16 +350,16 @@ impl AppDag {
|
|||
pub(super) fn fork(&self, change_store: ChangeStore) -> AppDag {
|
||||
AppDag {
|
||||
change_store: change_store.clone(),
|
||||
map: Mutex::new(self.map.lock().unwrap().clone()),
|
||||
map: Mutex::new(self.map.try_lock().unwrap().clone()),
|
||||
frontiers: self.frontiers.clone(),
|
||||
vv: self.vv.clone(),
|
||||
unparsed_vv: Mutex::new(self.unparsed_vv.lock().unwrap().clone()),
|
||||
unhandled_dep_points: Mutex::new(self.unhandled_dep_points.lock().unwrap().clone()),
|
||||
unparsed_vv: Mutex::new(self.unparsed_vv.try_lock().unwrap().clone()),
|
||||
unhandled_dep_points: Mutex::new(self.unhandled_dep_points.try_lock().unwrap().clone()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn total_parsed_dag_node(&self) -> usize {
|
||||
self.map.lock().unwrap().len()
|
||||
self.map.try_lock().unwrap().len()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -463,7 +467,7 @@ impl Dag for AppDag {
|
|||
|
||||
fn get(&self, id: ID) -> Option<Self::Node> {
|
||||
self.ensure_lazy_load_node(id);
|
||||
let binding = self.map.lock().unwrap();
|
||||
let binding = self.map.try_lock().unwrap();
|
||||
let x = binding.range(..=id).next_back()?;
|
||||
if x.1.contains_id(id) {
|
||||
// PERF: do we need to optimize clone like this?
|
||||
|
|
Loading…
Reference in a new issue