From 378f0c32fe35c104288f8f24468a0bed247defff Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Thu, 5 Jan 2023 16:41:23 -0800 Subject: [PATCH] Restructure callback subscriptions Fix a callback leak that would occur when dropping a subscription to a callback collection after triggering that callback, but before processing the effect of *adding* the handler. Co-authored-by: Kay Simmons --- crates/gpui/src/app.rs | 360 ++++----------------- crates/gpui/src/app/callback_collection.rs | 197 +++++++---- 2 files changed, 207 insertions(+), 350 deletions(-) diff --git a/crates/gpui/src/app.rs b/crates/gpui/src/app.rs index 76b9fb1aa6..dce4e68e7c 100644 --- a/crates/gpui/src/app.rs +++ b/crates/gpui/src/app.rs @@ -26,8 +26,8 @@ use smallvec::SmallVec; use smol::prelude::*; pub use action::*; -use callback_collection::{CallbackCollection, Mapping}; -use collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, HashSet, VecDeque}; +use callback_collection::CallbackCollection; +use collections::{hash_map::Entry, BTreeMap, HashMap, HashSet, VecDeque}; use keymap::MatchResult; use platform::Event; #[cfg(any(test, feature = "test-support"))] @@ -1047,12 +1047,10 @@ impl MutableAppContext { callback(payload, cx) }), }); - - Subscription::GlobalSubscription { - id: subscription_id, - type_id, - subscriptions: Some(self.global_subscriptions.downgrade()), - } + Subscription::GlobalSubscription( + self.global_subscriptions + .subscribe(type_id, subscription_id), + ) } pub fn observe(&mut self, handle: &H, mut callback: F) -> Subscription @@ -1089,11 +1087,7 @@ impl MutableAppContext { } }), }); - Subscription::Subscription { - id: subscription_id, - entity_id: handle.id(), - subscriptions: Some(self.subscriptions.downgrade()), - } + Subscription::Subscription(self.subscriptions.subscribe(handle.id(), subscription_id)) } fn observe_internal(&mut self, handle: &H, mut callback: F) -> Subscription @@ -1117,11 +1111,7 @@ impl MutableAppContext { } }), }); - Subscription::Observation { - id: subscription_id, - entity_id, - observations: Some(self.observations.downgrade()), - } + Subscription::Observation(self.observations.subscribe(entity_id, subscription_id)) } fn observe_focus(&mut self, handle: &ViewHandle, mut callback: F) -> Subscription @@ -1144,12 +1134,7 @@ impl MutableAppContext { } }), }); - - Subscription::FocusObservation { - id: subscription_id, - view_id, - observations: Some(self.focus_observations.downgrade()), - } + Subscription::FocusObservation(self.focus_observations.subscribe(view_id, subscription_id)) } pub fn observe_global(&mut self, mut observe: F) -> Subscription @@ -1165,12 +1150,7 @@ impl MutableAppContext { id, Box::new(move |cx: &mut MutableAppContext| observe(cx)), ); - - Subscription::GlobalObservation { - id, - type_id, - observations: Some(self.global_observations.downgrade()), - } + Subscription::GlobalObservation(self.global_observations.subscribe(type_id, id)) } pub fn observe_default_global(&mut self, observe: F) -> Subscription @@ -1235,11 +1215,10 @@ impl MutableAppContext { subscription_id, callback: Box::new(callback), }); - Subscription::WindowActivationObservation { - id: subscription_id, - window_id, - observations: Some(self.window_activation_observations.downgrade()), - } + Subscription::WindowActivationObservation( + self.window_activation_observations + .subscribe(window_id, subscription_id), + ) } fn observe_fullscreen(&mut self, window_id: usize, callback: F) -> Subscription @@ -1253,11 +1232,10 @@ impl MutableAppContext { subscription_id, callback: Box::new(callback), }); - Subscription::WindowFullscreenObservation { - id: subscription_id, - window_id, - observations: Some(self.window_activation_observations.downgrade()), - } + Subscription::WindowActivationObservation( + self.window_activation_observations + .subscribe(window_id, subscription_id), + ) } pub fn observe_keystrokes(&mut self, window_id: usize, callback: F) -> Subscription @@ -1273,12 +1251,10 @@ impl MutableAppContext { let subscription_id = post_inc(&mut self.next_subscription_id); self.keystroke_observations .add_callback(window_id, subscription_id, Box::new(callback)); - - Subscription::KeystrokeObservation { - id: subscription_id, - window_id, - observations: Some(self.keystroke_observations.downgrade()), - } + Subscription::KeystrokeObservation( + self.keystroke_observations + .subscribe(window_id, subscription_id), + ) } pub fn defer(&mut self, callback: impl 'static + FnOnce(&mut MutableAppContext)) { @@ -1999,15 +1975,13 @@ impl MutableAppContext { entity_id, subscription_id, callback, - } => self.subscriptions.add_or_remove_callback( - entity_id, - subscription_id, - callback, - ), + } => self + .subscriptions + .add_callback(entity_id, subscription_id, callback), Effect::Event { entity_id, payload } => { let mut subscriptions = self.subscriptions.clone(); - subscriptions.emit_and_cleanup(entity_id, self, |callback, this| { + subscriptions.emit(entity_id, self, |callback, this| { callback(payload.as_ref(), this) }) } @@ -2016,7 +1990,7 @@ impl MutableAppContext { type_id, subscription_id, callback, - } => self.global_subscriptions.add_or_remove_callback( + } => self.global_subscriptions.add_callback( type_id, subscription_id, callback, @@ -2028,16 +2002,13 @@ impl MutableAppContext { entity_id, subscription_id, callback, - } => self.observations.add_or_remove_callback( - entity_id, - subscription_id, - callback, - ), + } => self + .observations + .add_callback(entity_id, subscription_id, callback), Effect::ModelNotification { model_id } => { let mut observations = self.observations.clone(); - observations - .emit_and_cleanup(model_id, self, |callback, this| callback(this)); + observations.emit(model_id, self, |callback, this| callback(this)); } Effect::ViewNotification { window_id, view_id } => { @@ -2046,7 +2017,7 @@ impl MutableAppContext { Effect::GlobalNotification { type_id } => { let mut subscriptions = self.global_observations.clone(); - subscriptions.emit_and_cleanup(type_id, self, |callback, this| { + subscriptions.emit(type_id, self, |callback, this| { callback(this); true }); @@ -2080,7 +2051,7 @@ impl MutableAppContext { subscription_id, callback, } => { - self.focus_observations.add_or_remove_callback( + self.focus_observations.add_callback( view_id, subscription_id, callback, @@ -2099,7 +2070,7 @@ impl MutableAppContext { window_id, subscription_id, callback, - } => self.window_activation_observations.add_or_remove_callback( + } => self.window_activation_observations.add_callback( window_id, subscription_id, callback, @@ -2114,7 +2085,7 @@ impl MutableAppContext { window_id, subscription_id, callback, - } => self.window_fullscreen_observations.add_or_remove_callback( + } => self.window_fullscreen_observations.add_callback( window_id, subscription_id, callback, @@ -2158,7 +2129,17 @@ impl MutableAppContext { self.pending_notifications.clear(); self.remove_dropped_entities(); } else { + self.focus_observations.gc(); + self.global_subscriptions.gc(); + self.global_observations.gc(); + self.subscriptions.gc(); + self.observations.gc(); + self.window_activation_observations.gc(); + self.window_fullscreen_observations.gc(); + self.keystroke_observations.gc(); + self.remove_dropped_entities(); + if refreshing { self.perform_window_refresh(); } else { @@ -2295,7 +2276,7 @@ impl MutableAppContext { let type_id = (&*payload).type_id(); let mut subscriptions = self.global_subscriptions.clone(); - subscriptions.emit_and_cleanup(type_id, self, |callback, this| { + subscriptions.emit(type_id, self, |callback, this| { callback(payload.as_ref(), this); true //Always alive }); @@ -2320,7 +2301,7 @@ impl MutableAppContext { } let mut observations = self.observations.clone(); - observations.emit_and_cleanup(observed_view_id, self, |callback, this| callback(this)); + observations.emit(observed_view_id, self, |callback, this| callback(this)); } } @@ -2350,7 +2331,7 @@ impl MutableAppContext { window.is_fullscreen = is_fullscreen; let mut observations = this.window_fullscreen_observations.clone(); - observations.emit_and_cleanup(window_id, this, |callback, this| { + observations.emit(window_id, this, |callback, this| { callback(is_fullscreen, this) }); @@ -2367,7 +2348,7 @@ impl MutableAppContext { ) { self.update(|this| { let mut observations = this.keystroke_observations.clone(); - observations.emit_and_cleanup(window_id, this, { + observations.emit(window_id, this, { move |callback, this| callback(&keystroke, &result, handled_by.as_ref(), this) }); }); @@ -2403,7 +2384,7 @@ impl MutableAppContext { } let mut observations = this.window_activation_observations.clone(); - observations.emit_and_cleanup(window_id, this, |callback, this| callback(active, this)); + observations.emit(window_id, this, |callback, this| callback(active, this)); Some(()) }); @@ -2443,8 +2424,7 @@ impl MutableAppContext { } let mut subscriptions = this.focus_observations.clone(); - subscriptions - .emit_and_cleanup(blurred_id, this, |callback, this| callback(false, this)); + subscriptions.emit(blurred_id, this, |callback, this| callback(false, this)); } if let Some(focused_id) = focused_id { @@ -2456,8 +2436,7 @@ impl MutableAppContext { } let mut subscriptions = this.focus_observations.clone(); - subscriptions - .emit_and_cleanup(focused_id, this, |callback, this| callback(true, this)); + subscriptions.emit(focused_id, this, |callback, this| callback(true, this)); } }) } @@ -5106,46 +5085,14 @@ impl Drop for ElementStateHandle { #[must_use] pub enum Subscription { - Subscription { - id: usize, - entity_id: usize, - subscriptions: Option>>, - }, - GlobalSubscription { - id: usize, - type_id: TypeId, - subscriptions: Option>>, - }, - Observation { - id: usize, - entity_id: usize, - observations: Option>>, - }, - GlobalObservation { - id: usize, - type_id: TypeId, - observations: Option>>, - }, - FocusObservation { - id: usize, - view_id: usize, - observations: Option>>, - }, - WindowActivationObservation { - id: usize, - window_id: usize, - observations: Option>>, - }, - WindowFullscreenObservation { - id: usize, - window_id: usize, - observations: Option>>, - }, - KeystrokeObservation { - id: usize, - window_id: usize, - observations: Option>>, - }, + Subscription(callback_collection::Subscription), + Observation(callback_collection::Subscription), + GlobalSubscription(callback_collection::Subscription), + GlobalObservation(callback_collection::Subscription), + FocusObservation(callback_collection::Subscription), + WindowActivationObservation(callback_collection::Subscription), + WindowFullscreenObservation(callback_collection::Subscription), + KeystrokeObservation(callback_collection::Subscription), ReleaseObservation { id: usize, @@ -5163,36 +5110,21 @@ pub enum Subscription { impl Subscription { pub fn detach(&mut self) { match self { - Subscription::Subscription { subscriptions, .. } => { - subscriptions.take(); - } - Subscription::GlobalSubscription { subscriptions, .. } => { - subscriptions.take(); - } - Subscription::Observation { observations, .. } => { - observations.take(); - } - Subscription::GlobalObservation { observations, .. } => { - observations.take(); - } + Subscription::Subscription(subscription) => subscription.detach(), + Subscription::GlobalSubscription(subscription) => subscription.detach(), + Subscription::Observation(subscription) => subscription.detach(), + Subscription::GlobalObservation(subscription) => subscription.detach(), + Subscription::FocusObservation(subscription) => subscription.detach(), + Subscription::KeystrokeObservation(subscription) => subscription.detach(), + Subscription::WindowActivationObservation(subscription) => subscription.detach(), + Subscription::WindowFullscreenObservation(subscription) => subscription.detach(), + Subscription::ReleaseObservation { observations, .. } => { observations.take(); } - Subscription::FocusObservation { observations, .. } => { - observations.take(); - } Subscription::ActionObservation { observations, .. } => { observations.take(); } - Subscription::KeystrokeObservation { observations, .. } => { - observations.take(); - } - Subscription::WindowActivationObservation { observations, .. } => { - observations.take(); - } - Subscription::WindowFullscreenObservation { observations, .. } => { - observations.take(); - } } } } @@ -5200,80 +5132,6 @@ impl Subscription { impl Drop for Subscription { fn drop(&mut self) { match self { - Subscription::Subscription { - id, - entity_id, - subscriptions, - } => { - if let Some(subscriptions) = subscriptions.as_ref().and_then(Weak::upgrade) { - match subscriptions - .lock() - .entry(*entity_id) - .or_default() - .entry(*id) - { - btree_map::Entry::Vacant(entry) => { - entry.insert(None); - } - btree_map::Entry::Occupied(entry) => { - entry.remove(); - } - } - } - } - Subscription::GlobalSubscription { - id, - type_id, - subscriptions, - } => { - if let Some(subscriptions) = subscriptions.as_ref().and_then(Weak::upgrade) { - match subscriptions.lock().entry(*type_id).or_default().entry(*id) { - btree_map::Entry::Vacant(entry) => { - entry.insert(None); - } - btree_map::Entry::Occupied(entry) => { - entry.remove(); - } - } - } - } - Subscription::Observation { - id, - entity_id, - observations, - } => { - if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) { - match observations - .lock() - .entry(*entity_id) - .or_default() - .entry(*id) - { - btree_map::Entry::Vacant(entry) => { - entry.insert(None); - } - btree_map::Entry::Occupied(entry) => { - entry.remove(); - } - } - } - } - Subscription::GlobalObservation { - id, - type_id, - observations, - } => { - if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) { - match observations.lock().entry(*type_id).or_default().entry(*id) { - collections::btree_map::Entry::Vacant(entry) => { - entry.insert(None); - } - collections::btree_map::Entry::Occupied(entry) => { - entry.remove(); - } - } - } - } Subscription::ReleaseObservation { id, entity_id, @@ -5285,90 +5143,12 @@ impl Drop for Subscription { } } } - Subscription::FocusObservation { - id, - view_id, - observations, - } => { - if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) { - match observations.lock().entry(*view_id).or_default().entry(*id) { - btree_map::Entry::Vacant(entry) => { - entry.insert(None); - } - btree_map::Entry::Occupied(entry) => { - entry.remove(); - } - } - } - } Subscription::ActionObservation { id, observations } => { if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) { observations.lock().remove(id); } } - Subscription::KeystrokeObservation { - id, - window_id, - observations, - } => { - if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) { - match observations - .lock() - .entry(*window_id) - .or_default() - .entry(*id) - { - btree_map::Entry::Vacant(entry) => { - entry.insert(None); - } - btree_map::Entry::Occupied(entry) => { - entry.remove(); - } - } - } - } - Subscription::WindowActivationObservation { - id, - window_id, - observations, - } => { - if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) { - match observations - .lock() - .entry(*window_id) - .or_default() - .entry(*id) - { - btree_map::Entry::Vacant(entry) => { - entry.insert(None); - } - btree_map::Entry::Occupied(entry) => { - entry.remove(); - } - } - } - } - Subscription::WindowFullscreenObservation { - id, - window_id, - observations, - } => { - if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) { - match observations - .lock() - .entry(*window_id) - .or_default() - .entry(*id) - { - btree_map::Entry::Vacant(entry) => { - entry.insert(None); - } - btree_map::Entry::Occupied(entry) => { - entry.remove(); - } - } - } - } + _ => {} } } } diff --git a/crates/gpui/src/app/callback_collection.rs b/crates/gpui/src/app/callback_collection.rs index 4ec90fbac0..4494e9073f 100644 --- a/crates/gpui/src/app/callback_collection.rs +++ b/crates/gpui/src/app/callback_collection.rs @@ -1,19 +1,70 @@ +use std::mem; use std::sync::Arc; use std::{hash::Hash, sync::Weak}; use parking_lot::Mutex; -use collections::{btree_map, BTreeMap, HashMap}; +use collections::{btree_map, BTreeMap, HashMap, HashSet}; use crate::MutableAppContext; -pub type Mapping = Mutex>>>; +// Problem 5: Current bug callbacks currently called many times after being dropped +// update +// notify +// observe (push effect) +// subscription {id : 5} +// pending: [Effect::Notify, Effect::observe { id: 5 }] +// drop observation subscription (write None into subscriptions) +// flush effects +// notify +// observe +// Problem 6: Key-value pair is leaked if you drop a callback while calling it, and then never call that set of callbacks again +// ----------------- +// Problem 1: Many very similar subscription enum variants +// Problem 2: Subscriptions and CallbackCollections use a shared mutex to update the callback status +// Problem 3: Current implementation is error prone with regard to uninitialized callbacks or dropping during callback +// Problem 4: Calling callbacks requires removing all of them from the list and adding them back -pub struct CallbackCollection { - internal: Arc>, +// Solution 1 CallbackState: +// Add more state to the CallbackCollection map to communicate dropped and initialized status +// Solves: P5 +// Solution 2 DroppedSubscriptionList: +// Store a parallel set of dropped subscriptions in the Mapping which stores the key and subscription id for all dropped subscriptions +// which can be +// Solution 3 GarbageCollection: +// Use some type of traditional garbage collection to handle dropping of callbacks +// atomic flag per callback which is looped over in remove dropped entities + +// TODO: +// - Move subscription id counter to CallbackCollection +// - Consider adding a reverse map in Mapping from subscription id to key so that the dropped subscriptions +// can be a hashset of usize and the Subscription doesn't need the key +// - Investigate why the remaining two types of callback lists can't use the same callback collection and subscriptions +pub struct Subscription { + key: K, + id: usize, + mapping: Option>>>, } -impl Clone for CallbackCollection { +struct Mapping { + callbacks: HashMap>, + dropped_subscriptions: HashSet<(K, usize)>, +} + +impl Default for Mapping { + fn default() -> Self { + Self { + callbacks: Default::default(), + dropped_subscriptions: Default::default(), + } + } +} + +pub(crate) struct CallbackCollection { + internal: Arc>>, +} + +impl Clone for CallbackCollection { fn clone(&self) -> Self { Self { internal: self.internal.clone(), @@ -21,7 +72,7 @@ impl Clone for CallbackCollection { } } -impl Default for CallbackCollection { +impl Default for CallbackCollection { fn default() -> Self { CallbackCollection { internal: Arc::new(Mutex::new(Default::default())), @@ -29,75 +80,101 @@ impl Default for CallbackCollection { } } -impl CallbackCollection { - pub fn downgrade(&self) -> Weak> { - Arc::downgrade(&self.internal) - } - +impl CallbackCollection { #[cfg(test)] pub fn is_empty(&self) -> bool { - self.internal.lock().is_empty() + self.internal.lock().callbacks.is_empty() } - pub fn add_callback(&mut self, id: K, subscription_id: usize, callback: F) { + pub fn subscribe(&mut self, key: K, subscription_id: usize) -> Subscription { + Subscription { + key, + id: subscription_id, + mapping: Some(Arc::downgrade(&self.internal)), + } + } + + pub fn count(&mut self, key: K) -> usize { self.internal .lock() - .entry(id) - .or_default() - .insert(subscription_id, Some(callback)); + .callbacks + .get(&key) + .map_or(0, |callbacks| callbacks.len()) } - pub fn remove(&mut self, id: K) { - self.internal.lock().remove(&id); + pub fn add_callback(&mut self, key: K, subscription_id: usize, callback: F) { + let mut this = self.internal.lock(); + if !this.dropped_subscriptions.contains(&(key, subscription_id)) { + this.callbacks + .entry(key) + .or_default() + .insert(subscription_id, callback); + } } - pub fn add_or_remove_callback(&mut self, id: K, subscription_id: usize, callback: F) { - match self - .internal - .lock() - .entry(id) - .or_default() - .entry(subscription_id) - { - btree_map::Entry::Vacant(entry) => { - entry.insert(Some(callback)); - } + pub fn remove(&mut self, key: K) { + self.internal.lock().callbacks.remove(&key); + } - btree_map::Entry::Occupied(entry) => { - // TODO: This seems like it should never be called because no code - // should ever attempt to remove an existing callback - debug_assert!(entry.get().is_none()); - entry.remove(); + pub fn emit bool>( + &mut self, + key: K, + cx: &mut MutableAppContext, + mut call_callback: C, + ) { + let callbacks = self.internal.lock().callbacks.remove(&key); + if let Some(callbacks) = callbacks { + for (subscription_id, mut callback) in callbacks { + if !self + .internal + .lock() + .dropped_subscriptions + .contains(&(key, subscription_id)) + { + if call_callback(&mut callback, cx) { + self.add_callback(key, subscription_id, callback); + } + } } } } - pub fn emit_and_cleanup bool>( - &mut self, - id: K, - cx: &mut MutableAppContext, - mut call_callback: C, - ) { - let callbacks = self.internal.lock().remove(&id); - if let Some(callbacks) = callbacks { - for (subscription_id, callback) in callbacks { - if let Some(mut callback) = callback { - let alive = call_callback(&mut callback, cx); - if alive { - match self - .internal - .lock() - .entry(id) - .or_default() - .entry(subscription_id) - { - btree_map::Entry::Vacant(entry) => { - entry.insert(Some(callback)); - } - btree_map::Entry::Occupied(entry) => { - entry.remove(); - } - } + pub fn gc(&mut self) { + let mut this = self.internal.lock(); + + for (key, id) in mem::take(&mut this.dropped_subscriptions) { + if let Some(callbacks) = this.callbacks.get_mut(&key) { + callbacks.remove(&id); + } + } + } +} + +impl Subscription { + pub fn detach(&mut self) { + self.mapping.take(); + } +} + +impl Drop for Subscription { + // If the callback has been initialized (no callback in the list for the key and id), + // add this subscription id and key to the dropped subscriptions list + // Otherwise, just remove the associated callback from the callback collection + fn drop(&mut self) { + if let Some(mapping) = self.mapping.as_ref().and_then(|mapping| mapping.upgrade()) { + let mut mapping = mapping.lock(); + if let Some(callbacks) = mapping.callbacks.get_mut(&self.key) { + match callbacks.entry(self.id) { + btree_map::Entry::Vacant(_) => { + mapping + .dropped_subscriptions + .insert((self.key.clone(), self.id)); + } + btree_map::Entry::Occupied(entry) => { + entry.remove(); + mapping + .dropped_subscriptions + .insert((self.key.clone(), self.id)); } } }