Merge pull request #2008 from zed-industries/callback-leaks

Fix callback leaks when subscriptions are added and dropped in the same effect cycle
This commit is contained in:
Max Brunsfeld 2023-01-06 12:01:27 -08:00 committed by GitHub
commit 6a57bd2794
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 336 additions and 625 deletions

File diff suppressed because it is too large Load diff

View file

@ -1,19 +1,44 @@
use crate::MutableAppContext;
use collections::{BTreeMap, HashMap, HashSet};
use parking_lot::Mutex;
use std::sync::Arc;
use std::{hash::Hash, sync::Weak};
use parking_lot::Mutex;
use collections::{btree_map, BTreeMap, HashMap};
use crate::MutableAppContext;
pub type Mapping<K, F> = Mutex<HashMap<K, BTreeMap<usize, Option<F>>>>;
pub struct CallbackCollection<K: Hash + Eq, F> {
internal: Arc<Mapping<K, F>>,
pub struct CallbackCollection<K: Clone + Hash + Eq, F> {
internal: Arc<Mutex<Mapping<K, F>>>,
}
impl<K: Hash + Eq, F> Clone for CallbackCollection<K, F> {
pub struct Subscription<K: Clone + Hash + Eq, F> {
key: K,
id: usize,
mapping: Option<Weak<Mutex<Mapping<K, F>>>>,
}
struct Mapping<K, F> {
callbacks: HashMap<K, BTreeMap<usize, F>>,
dropped_subscriptions: HashMap<K, HashSet<usize>>,
}
impl<K: Hash + Eq, F> Mapping<K, F> {
fn clear_dropped_state(&mut self, key: &K, subscription_id: usize) -> bool {
if let Some(subscriptions) = self.dropped_subscriptions.get_mut(&key) {
subscriptions.remove(&subscription_id)
} else {
false
}
}
}
impl<K, F> Default for Mapping<K, F> {
fn default() -> Self {
Self {
callbacks: Default::default(),
dropped_subscriptions: Default::default(),
}
}
}
impl<K: Clone + Hash + Eq, F> Clone for CallbackCollection<K, F> {
fn clone(&self) -> Self {
Self {
internal: self.internal.clone(),
@ -21,7 +46,7 @@ impl<K: Hash + Eq, F> Clone for CallbackCollection<K, F> {
}
}
impl<K: Hash + Eq + Copy, F> Default for CallbackCollection<K, F> {
impl<K: Clone + Hash + Eq + Copy, F> Default for CallbackCollection<K, F> {
fn default() -> Self {
CallbackCollection {
internal: Arc::new(Mutex::new(Default::default())),
@ -29,78 +54,114 @@ impl<K: Hash + Eq + Copy, F> Default for CallbackCollection<K, F> {
}
}
impl<K: Hash + Eq + Copy, F> CallbackCollection<K, F> {
pub fn downgrade(&self) -> Weak<Mapping<K, F>> {
Arc::downgrade(&self.internal)
}
impl<K: Clone + Hash + Eq + Copy, F> CallbackCollection<K, F> {
#[cfg(test)]
pub fn is_empty(&self) -> bool {
self.internal.lock().is_empty()
self.internal.lock().callbacks.is_empty()
}
pub fn add_callback(&mut self, id: K, subscription_id: usize, callback: F) {
self.internal
.lock()
.entry(id)
.or_default()
.insert(subscription_id, Some(callback));
}
pub fn remove(&mut self, id: K) {
self.internal.lock().remove(&id);
}
pub fn add_or_remove_callback(&mut self, id: K, subscription_id: usize, callback: F) {
match self
.internal
.lock()
.entry(id)
.or_default()
.entry(subscription_id)
{
btree_map::Entry::Vacant(entry) => {
entry.insert(Some(callback));
}
btree_map::Entry::Occupied(entry) => {
// TODO: This seems like it should never be called because no code
// should ever attempt to remove an existing callback
debug_assert!(entry.get().is_none());
entry.remove();
}
pub fn subscribe(&mut self, key: K, subscription_id: usize) -> Subscription<K, F> {
Subscription {
key,
id: subscription_id,
mapping: Some(Arc::downgrade(&self.internal)),
}
}
pub fn emit_and_cleanup<C: FnMut(&mut F, &mut MutableAppContext) -> bool>(
pub fn add_callback(&mut self, key: K, subscription_id: usize, callback: F) {
let mut this = self.internal.lock();
// If this callback's subscription was dropped before the callback was
// added, then just drop the callback.
if this.clear_dropped_state(&key, subscription_id) {
return;
}
this.callbacks
.entry(key)
.or_default()
.insert(subscription_id, callback);
}
pub fn remove(&mut self, key: K) {
// Drop these callbacks after releasing the lock, in case one of them
// owns a subscription to this callback collection.
let mut this = self.internal.lock();
let callbacks = this.callbacks.remove(&key);
this.dropped_subscriptions.remove(&key);
drop(this);
drop(callbacks);
}
pub fn emit<C: FnMut(&mut F, &mut MutableAppContext) -> bool>(
&mut self,
id: K,
key: K,
cx: &mut MutableAppContext,
mut call_callback: C,
) {
let callbacks = self.internal.lock().remove(&id);
let callbacks = self.internal.lock().callbacks.remove(&key);
if let Some(callbacks) = callbacks {
for (subscription_id, callback) in callbacks {
if let Some(mut callback) = callback {
let alive = call_callback(&mut callback, cx);
if alive {
match self
.internal
.lock()
.entry(id)
.or_default()
.entry(subscription_id)
{
btree_map::Entry::Vacant(entry) => {
entry.insert(Some(callback));
}
btree_map::Entry::Occupied(entry) => {
entry.remove();
}
}
}
for (subscription_id, mut callback) in callbacks {
// If this callback's subscription was dropped while invoking an
// earlier callback, then just drop the callback.
let mut this = self.internal.lock();
if this.clear_dropped_state(&key, subscription_id) {
continue;
}
drop(this);
let alive = call_callback(&mut callback, cx);
// If this callback's subscription was dropped while invoking the callback
// itself, or if the callback returns false, then just drop the callback.
let mut this = self.internal.lock();
if this.clear_dropped_state(&key, subscription_id) || !alive {
continue;
}
this.callbacks
.entry(key)
.or_default()
.insert(subscription_id, callback);
}
}
}
}
impl<K: Clone + Hash + Eq, F> Subscription<K, F> {
pub fn id(&self) -> usize {
self.id
}
pub fn detach(&mut self) {
self.mapping.take();
}
}
impl<K: Clone + Hash + Eq, F> Drop for Subscription<K, F> {
fn drop(&mut self) {
if let Some(mapping) = self.mapping.as_ref().and_then(|mapping| mapping.upgrade()) {
let mut mapping = mapping.lock();
// If the callback is present in the mapping, then just remove it.
if let Some(callbacks) = mapping.callbacks.get_mut(&self.key) {
let callback = callbacks.remove(&self.id);
if callback.is_some() {
drop(mapping);
drop(callback);
return;
}
}
// If this subscription's callback is not present, then either it has been
// temporarily removed during emit, or it has not yet been added. Record
// that this subscription has been dropped so that the callback can be
// removed later.
mapping
.dropped_subscriptions
.entry(self.key.clone())
.or_default()
.insert(self.id);
}
}
}