Remove dropped subscription eagerly when removing callbacks

This commit is contained in:
Max Brunsfeld 2023-01-06 11:03:45 -08:00
parent 4708f5d88f
commit ef192a902a

View file

@ -60,17 +60,24 @@ impl<K: Clone + Hash + Eq + Copy, F> CallbackCollection<K, F> {
pub fn add_callback(&mut self, key: K, subscription_id: usize, callback: F) { pub fn add_callback(&mut self, key: K, subscription_id: usize, callback: F) {
let mut this = self.internal.lock(); let mut this = self.internal.lock();
if !this.dropped_subscriptions.contains(&(key, subscription_id)) {
this.callbacks // If this callback's subscription was dropped before the callback was
.entry(key) // added, then just drop the callback.
.or_default() if this.dropped_subscriptions.remove(&(key, subscription_id)) {
.insert(subscription_id, callback); return;
} }
this.callbacks
.entry(key)
.or_default()
.insert(subscription_id, callback);
} }
pub fn remove(&mut self, key: K) { pub fn remove(&mut self, key: K) {
let callbacks = self.internal.lock().callbacks.remove(&key); let callbacks = self.internal.lock().callbacks.remove(&key);
// drop these after releasing the lock
// Drop these callbacks after releasing the lock, in case one of them
// owns a subscription to this callback collection.
drop(callbacks); drop(callbacks);
} }
@ -83,15 +90,19 @@ impl<K: Clone + Hash + Eq + Copy, F> CallbackCollection<K, F> {
let callbacks = self.internal.lock().callbacks.remove(&key); let callbacks = self.internal.lock().callbacks.remove(&key);
if let Some(callbacks) = callbacks { if let Some(callbacks) = callbacks {
for (subscription_id, mut callback) in callbacks { for (subscription_id, mut callback) in callbacks {
if !self // If this callback's subscription was dropped while invoking an
// earlier callback, then just drop this callback.
if self
.internal .internal
.lock() .lock()
.dropped_subscriptions .dropped_subscriptions
.contains(&(key, subscription_id)) .remove(&(key, subscription_id))
{ {
if call_callback(&mut callback, cx) { continue;
self.add_callback(key, subscription_id, callback); }
}
if call_callback(&mut callback, cx) {
self.add_callback(key, subscription_id, callback);
} }
} }
} }
@ -119,17 +130,24 @@ impl<K: Clone + Hash + Eq, F> Subscription<K, F> {
} }
impl<K: Clone + Hash + Eq, F> Drop for Subscription<K, F> { impl<K: Clone + Hash + Eq, F> Drop for Subscription<K, F> {
// If the callback has been initialized (no callback in the list for the key and id),
// add this subscription id and key to the dropped subscriptions list
// Otherwise, just remove the associated callback from the callback collection
fn drop(&mut self) { fn drop(&mut self) {
if let Some(mapping) = self.mapping.as_ref().and_then(|mapping| mapping.upgrade()) { if let Some(mapping) = self.mapping.as_ref().and_then(|mapping| mapping.upgrade()) {
let mut mapping = mapping.lock(); let mut mapping = mapping.lock();
// If the callback is present in the mapping, then just remove it.
if let Some(callbacks) = mapping.callbacks.get_mut(&self.key) { if let Some(callbacks) = mapping.callbacks.get_mut(&self.key) {
if callbacks.remove(&self.id).is_some() { let callback = callbacks.remove(&self.id);
if callback.is_some() {
drop(mapping);
drop(callback);
return; return;
} }
} }
// If this subscription's callback is not present, then either it has been
// temporarily removed during emit, or it has not yet been added. Record
// that this subscription has been dropped so that the callback can be
// removed later.
mapping mapping
.dropped_subscriptions .dropped_subscriptions
.insert((self.key.clone(), self.id)); .insert((self.key.clone(), self.id));