From 378f0c32fe35c104288f8f24468a0bed247defff Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Thu, 5 Jan 2023 16:41:23 -0800 Subject: [PATCH 1/9] Restructure callback subscriptions Fix a callback leak that would occur when dropping a subscription to a callback collection after triggering that callback, but before processing the effect of *adding* the handler. Co-authored-by: Kay Simmons --- crates/gpui/src/app.rs | 360 ++++----------------- crates/gpui/src/app/callback_collection.rs | 197 +++++++---- 2 files changed, 207 insertions(+), 350 deletions(-) diff --git a/crates/gpui/src/app.rs b/crates/gpui/src/app.rs index 76b9fb1aa6..dce4e68e7c 100644 --- a/crates/gpui/src/app.rs +++ b/crates/gpui/src/app.rs @@ -26,8 +26,8 @@ use smallvec::SmallVec; use smol::prelude::*; pub use action::*; -use callback_collection::{CallbackCollection, Mapping}; -use collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, HashSet, VecDeque}; +use callback_collection::CallbackCollection; +use collections::{hash_map::Entry, BTreeMap, HashMap, HashSet, VecDeque}; use keymap::MatchResult; use platform::Event; #[cfg(any(test, feature = "test-support"))] @@ -1047,12 +1047,10 @@ impl MutableAppContext { callback(payload, cx) }), }); - - Subscription::GlobalSubscription { - id: subscription_id, - type_id, - subscriptions: Some(self.global_subscriptions.downgrade()), - } + Subscription::GlobalSubscription( + self.global_subscriptions + .subscribe(type_id, subscription_id), + ) } pub fn observe(&mut self, handle: &H, mut callback: F) -> Subscription @@ -1089,11 +1087,7 @@ impl MutableAppContext { } }), }); - Subscription::Subscription { - id: subscription_id, - entity_id: handle.id(), - subscriptions: Some(self.subscriptions.downgrade()), - } + Subscription::Subscription(self.subscriptions.subscribe(handle.id(), subscription_id)) } fn observe_internal(&mut self, handle: &H, mut callback: F) -> Subscription @@ -1117,11 +1111,7 @@ impl MutableAppContext { } }), }); - Subscription::Observation { - id: subscription_id, - entity_id, - observations: Some(self.observations.downgrade()), - } + Subscription::Observation(self.observations.subscribe(entity_id, subscription_id)) } fn observe_focus(&mut self, handle: &ViewHandle, mut callback: F) -> Subscription @@ -1144,12 +1134,7 @@ impl MutableAppContext { } }), }); - - Subscription::FocusObservation { - id: subscription_id, - view_id, - observations: Some(self.focus_observations.downgrade()), - } + Subscription::FocusObservation(self.focus_observations.subscribe(view_id, subscription_id)) } pub fn observe_global(&mut self, mut observe: F) -> Subscription @@ -1165,12 +1150,7 @@ impl MutableAppContext { id, Box::new(move |cx: &mut MutableAppContext| observe(cx)), ); - - Subscription::GlobalObservation { - id, - type_id, - observations: Some(self.global_observations.downgrade()), - } + Subscription::GlobalObservation(self.global_observations.subscribe(type_id, id)) } pub fn observe_default_global(&mut self, observe: F) -> Subscription @@ -1235,11 +1215,10 @@ impl MutableAppContext { subscription_id, callback: Box::new(callback), }); - Subscription::WindowActivationObservation { - id: subscription_id, - window_id, - observations: Some(self.window_activation_observations.downgrade()), - } + Subscription::WindowActivationObservation( + self.window_activation_observations + .subscribe(window_id, subscription_id), + ) } fn observe_fullscreen(&mut self, window_id: usize, callback: F) -> Subscription @@ -1253,11 +1232,10 @@ impl MutableAppContext { subscription_id, callback: Box::new(callback), }); - Subscription::WindowFullscreenObservation { - id: subscription_id, - window_id, - observations: Some(self.window_activation_observations.downgrade()), - } + Subscription::WindowActivationObservation( + self.window_activation_observations + .subscribe(window_id, subscription_id), + ) } pub fn observe_keystrokes(&mut self, window_id: usize, callback: F) -> Subscription @@ -1273,12 +1251,10 @@ impl MutableAppContext { let subscription_id = post_inc(&mut self.next_subscription_id); self.keystroke_observations .add_callback(window_id, subscription_id, Box::new(callback)); - - Subscription::KeystrokeObservation { - id: subscription_id, - window_id, - observations: Some(self.keystroke_observations.downgrade()), - } + Subscription::KeystrokeObservation( + self.keystroke_observations + .subscribe(window_id, subscription_id), + ) } pub fn defer(&mut self, callback: impl 'static + FnOnce(&mut MutableAppContext)) { @@ -1999,15 +1975,13 @@ impl MutableAppContext { entity_id, subscription_id, callback, - } => self.subscriptions.add_or_remove_callback( - entity_id, - subscription_id, - callback, - ), + } => self + .subscriptions + .add_callback(entity_id, subscription_id, callback), Effect::Event { entity_id, payload } => { let mut subscriptions = self.subscriptions.clone(); - subscriptions.emit_and_cleanup(entity_id, self, |callback, this| { + subscriptions.emit(entity_id, self, |callback, this| { callback(payload.as_ref(), this) }) } @@ -2016,7 +1990,7 @@ impl MutableAppContext { type_id, subscription_id, callback, - } => self.global_subscriptions.add_or_remove_callback( + } => self.global_subscriptions.add_callback( type_id, subscription_id, callback, @@ -2028,16 +2002,13 @@ impl MutableAppContext { entity_id, subscription_id, callback, - } => self.observations.add_or_remove_callback( - entity_id, - subscription_id, - callback, - ), + } => self + .observations + .add_callback(entity_id, subscription_id, callback), Effect::ModelNotification { model_id } => { let mut observations = self.observations.clone(); - observations - .emit_and_cleanup(model_id, self, |callback, this| callback(this)); + observations.emit(model_id, self, |callback, this| callback(this)); } Effect::ViewNotification { window_id, view_id } => { @@ -2046,7 +2017,7 @@ impl MutableAppContext { Effect::GlobalNotification { type_id } => { let mut subscriptions = self.global_observations.clone(); - subscriptions.emit_and_cleanup(type_id, self, |callback, this| { + subscriptions.emit(type_id, self, |callback, this| { callback(this); true }); @@ -2080,7 +2051,7 @@ impl MutableAppContext { subscription_id, callback, } => { - self.focus_observations.add_or_remove_callback( + self.focus_observations.add_callback( view_id, subscription_id, callback, @@ -2099,7 +2070,7 @@ impl MutableAppContext { window_id, subscription_id, callback, - } => self.window_activation_observations.add_or_remove_callback( + } => self.window_activation_observations.add_callback( window_id, subscription_id, callback, @@ -2114,7 +2085,7 @@ impl MutableAppContext { window_id, subscription_id, callback, - } => self.window_fullscreen_observations.add_or_remove_callback( + } => self.window_fullscreen_observations.add_callback( window_id, subscription_id, callback, @@ -2158,7 +2129,17 @@ impl MutableAppContext { self.pending_notifications.clear(); self.remove_dropped_entities(); } else { + self.focus_observations.gc(); + self.global_subscriptions.gc(); + self.global_observations.gc(); + self.subscriptions.gc(); + self.observations.gc(); + self.window_activation_observations.gc(); + self.window_fullscreen_observations.gc(); + self.keystroke_observations.gc(); + self.remove_dropped_entities(); + if refreshing { self.perform_window_refresh(); } else { @@ -2295,7 +2276,7 @@ impl MutableAppContext { let type_id = (&*payload).type_id(); let mut subscriptions = self.global_subscriptions.clone(); - subscriptions.emit_and_cleanup(type_id, self, |callback, this| { + subscriptions.emit(type_id, self, |callback, this| { callback(payload.as_ref(), this); true //Always alive }); @@ -2320,7 +2301,7 @@ impl MutableAppContext { } let mut observations = self.observations.clone(); - observations.emit_and_cleanup(observed_view_id, self, |callback, this| callback(this)); + observations.emit(observed_view_id, self, |callback, this| callback(this)); } } @@ -2350,7 +2331,7 @@ impl MutableAppContext { window.is_fullscreen = is_fullscreen; let mut observations = this.window_fullscreen_observations.clone(); - observations.emit_and_cleanup(window_id, this, |callback, this| { + observations.emit(window_id, this, |callback, this| { callback(is_fullscreen, this) }); @@ -2367,7 +2348,7 @@ impl MutableAppContext { ) { self.update(|this| { let mut observations = this.keystroke_observations.clone(); - observations.emit_and_cleanup(window_id, this, { + observations.emit(window_id, this, { move |callback, this| callback(&keystroke, &result, handled_by.as_ref(), this) }); }); @@ -2403,7 +2384,7 @@ impl MutableAppContext { } let mut observations = this.window_activation_observations.clone(); - observations.emit_and_cleanup(window_id, this, |callback, this| callback(active, this)); + observations.emit(window_id, this, |callback, this| callback(active, this)); Some(()) }); @@ -2443,8 +2424,7 @@ impl MutableAppContext { } let mut subscriptions = this.focus_observations.clone(); - subscriptions - .emit_and_cleanup(blurred_id, this, |callback, this| callback(false, this)); + subscriptions.emit(blurred_id, this, |callback, this| callback(false, this)); } if let Some(focused_id) = focused_id { @@ -2456,8 +2436,7 @@ impl MutableAppContext { } let mut subscriptions = this.focus_observations.clone(); - subscriptions - .emit_and_cleanup(focused_id, this, |callback, this| callback(true, this)); + subscriptions.emit(focused_id, this, |callback, this| callback(true, this)); } }) } @@ -5106,46 +5085,14 @@ impl Drop for ElementStateHandle { #[must_use] pub enum Subscription { - Subscription { - id: usize, - entity_id: usize, - subscriptions: Option>>, - }, - GlobalSubscription { - id: usize, - type_id: TypeId, - subscriptions: Option>>, - }, - Observation { - id: usize, - entity_id: usize, - observations: Option>>, - }, - GlobalObservation { - id: usize, - type_id: TypeId, - observations: Option>>, - }, - FocusObservation { - id: usize, - view_id: usize, - observations: Option>>, - }, - WindowActivationObservation { - id: usize, - window_id: usize, - observations: Option>>, - }, - WindowFullscreenObservation { - id: usize, - window_id: usize, - observations: Option>>, - }, - KeystrokeObservation { - id: usize, - window_id: usize, - observations: Option>>, - }, + Subscription(callback_collection::Subscription), + Observation(callback_collection::Subscription), + GlobalSubscription(callback_collection::Subscription), + GlobalObservation(callback_collection::Subscription), + FocusObservation(callback_collection::Subscription), + WindowActivationObservation(callback_collection::Subscription), + WindowFullscreenObservation(callback_collection::Subscription), + KeystrokeObservation(callback_collection::Subscription), ReleaseObservation { id: usize, @@ -5163,36 +5110,21 @@ pub enum Subscription { impl Subscription { pub fn detach(&mut self) { match self { - Subscription::Subscription { subscriptions, .. } => { - subscriptions.take(); - } - Subscription::GlobalSubscription { subscriptions, .. } => { - subscriptions.take(); - } - Subscription::Observation { observations, .. } => { - observations.take(); - } - Subscription::GlobalObservation { observations, .. } => { - observations.take(); - } + Subscription::Subscription(subscription) => subscription.detach(), + Subscription::GlobalSubscription(subscription) => subscription.detach(), + Subscription::Observation(subscription) => subscription.detach(), + Subscription::GlobalObservation(subscription) => subscription.detach(), + Subscription::FocusObservation(subscription) => subscription.detach(), + Subscription::KeystrokeObservation(subscription) => subscription.detach(), + Subscription::WindowActivationObservation(subscription) => subscription.detach(), + Subscription::WindowFullscreenObservation(subscription) => subscription.detach(), + Subscription::ReleaseObservation { observations, .. } => { observations.take(); } - Subscription::FocusObservation { observations, .. } => { - observations.take(); - } Subscription::ActionObservation { observations, .. } => { observations.take(); } - Subscription::KeystrokeObservation { observations, .. } => { - observations.take(); - } - Subscription::WindowActivationObservation { observations, .. } => { - observations.take(); - } - Subscription::WindowFullscreenObservation { observations, .. } => { - observations.take(); - } } } } @@ -5200,80 +5132,6 @@ impl Subscription { impl Drop for Subscription { fn drop(&mut self) { match self { - Subscription::Subscription { - id, - entity_id, - subscriptions, - } => { - if let Some(subscriptions) = subscriptions.as_ref().and_then(Weak::upgrade) { - match subscriptions - .lock() - .entry(*entity_id) - .or_default() - .entry(*id) - { - btree_map::Entry::Vacant(entry) => { - entry.insert(None); - } - btree_map::Entry::Occupied(entry) => { - entry.remove(); - } - } - } - } - Subscription::GlobalSubscription { - id, - type_id, - subscriptions, - } => { - if let Some(subscriptions) = subscriptions.as_ref().and_then(Weak::upgrade) { - match subscriptions.lock().entry(*type_id).or_default().entry(*id) { - btree_map::Entry::Vacant(entry) => { - entry.insert(None); - } - btree_map::Entry::Occupied(entry) => { - entry.remove(); - } - } - } - } - Subscription::Observation { - id, - entity_id, - observations, - } => { - if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) { - match observations - .lock() - .entry(*entity_id) - .or_default() - .entry(*id) - { - btree_map::Entry::Vacant(entry) => { - entry.insert(None); - } - btree_map::Entry::Occupied(entry) => { - entry.remove(); - } - } - } - } - Subscription::GlobalObservation { - id, - type_id, - observations, - } => { - if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) { - match observations.lock().entry(*type_id).or_default().entry(*id) { - collections::btree_map::Entry::Vacant(entry) => { - entry.insert(None); - } - collections::btree_map::Entry::Occupied(entry) => { - entry.remove(); - } - } - } - } Subscription::ReleaseObservation { id, entity_id, @@ -5285,90 +5143,12 @@ impl Drop for Subscription { } } } - Subscription::FocusObservation { - id, - view_id, - observations, - } => { - if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) { - match observations.lock().entry(*view_id).or_default().entry(*id) { - btree_map::Entry::Vacant(entry) => { - entry.insert(None); - } - btree_map::Entry::Occupied(entry) => { - entry.remove(); - } - } - } - } Subscription::ActionObservation { id, observations } => { if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) { observations.lock().remove(id); } } - Subscription::KeystrokeObservation { - id, - window_id, - observations, - } => { - if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) { - match observations - .lock() - .entry(*window_id) - .or_default() - .entry(*id) - { - btree_map::Entry::Vacant(entry) => { - entry.insert(None); - } - btree_map::Entry::Occupied(entry) => { - entry.remove(); - } - } - } - } - Subscription::WindowActivationObservation { - id, - window_id, - observations, - } => { - if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) { - match observations - .lock() - .entry(*window_id) - .or_default() - .entry(*id) - { - btree_map::Entry::Vacant(entry) => { - entry.insert(None); - } - btree_map::Entry::Occupied(entry) => { - entry.remove(); - } - } - } - } - Subscription::WindowFullscreenObservation { - id, - window_id, - observations, - } => { - if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) { - match observations - .lock() - .entry(*window_id) - .or_default() - .entry(*id) - { - btree_map::Entry::Vacant(entry) => { - entry.insert(None); - } - btree_map::Entry::Occupied(entry) => { - entry.remove(); - } - } - } - } + _ => {} } } } diff --git a/crates/gpui/src/app/callback_collection.rs b/crates/gpui/src/app/callback_collection.rs index 4ec90fbac0..4494e9073f 100644 --- a/crates/gpui/src/app/callback_collection.rs +++ b/crates/gpui/src/app/callback_collection.rs @@ -1,19 +1,70 @@ +use std::mem; use std::sync::Arc; use std::{hash::Hash, sync::Weak}; use parking_lot::Mutex; -use collections::{btree_map, BTreeMap, HashMap}; +use collections::{btree_map, BTreeMap, HashMap, HashSet}; use crate::MutableAppContext; -pub type Mapping = Mutex>>>; +// Problem 5: Current bug callbacks currently called many times after being dropped +// update +// notify +// observe (push effect) +// subscription {id : 5} +// pending: [Effect::Notify, Effect::observe { id: 5 }] +// drop observation subscription (write None into subscriptions) +// flush effects +// notify +// observe +// Problem 6: Key-value pair is leaked if you drop a callback while calling it, and then never call that set of callbacks again +// ----------------- +// Problem 1: Many very similar subscription enum variants +// Problem 2: Subscriptions and CallbackCollections use a shared mutex to update the callback status +// Problem 3: Current implementation is error prone with regard to uninitialized callbacks or dropping during callback +// Problem 4: Calling callbacks requires removing all of them from the list and adding them back -pub struct CallbackCollection { - internal: Arc>, +// Solution 1 CallbackState: +// Add more state to the CallbackCollection map to communicate dropped and initialized status +// Solves: P5 +// Solution 2 DroppedSubscriptionList: +// Store a parallel set of dropped subscriptions in the Mapping which stores the key and subscription id for all dropped subscriptions +// which can be +// Solution 3 GarbageCollection: +// Use some type of traditional garbage collection to handle dropping of callbacks +// atomic flag per callback which is looped over in remove dropped entities + +// TODO: +// - Move subscription id counter to CallbackCollection +// - Consider adding a reverse map in Mapping from subscription id to key so that the dropped subscriptions +// can be a hashset of usize and the Subscription doesn't need the key +// - Investigate why the remaining two types of callback lists can't use the same callback collection and subscriptions +pub struct Subscription { + key: K, + id: usize, + mapping: Option>>>, } -impl Clone for CallbackCollection { +struct Mapping { + callbacks: HashMap>, + dropped_subscriptions: HashSet<(K, usize)>, +} + +impl Default for Mapping { + fn default() -> Self { + Self { + callbacks: Default::default(), + dropped_subscriptions: Default::default(), + } + } +} + +pub(crate) struct CallbackCollection { + internal: Arc>>, +} + +impl Clone for CallbackCollection { fn clone(&self) -> Self { Self { internal: self.internal.clone(), @@ -21,7 +72,7 @@ impl Clone for CallbackCollection { } } -impl Default for CallbackCollection { +impl Default for CallbackCollection { fn default() -> Self { CallbackCollection { internal: Arc::new(Mutex::new(Default::default())), @@ -29,75 +80,101 @@ impl Default for CallbackCollection { } } -impl CallbackCollection { - pub fn downgrade(&self) -> Weak> { - Arc::downgrade(&self.internal) - } - +impl CallbackCollection { #[cfg(test)] pub fn is_empty(&self) -> bool { - self.internal.lock().is_empty() + self.internal.lock().callbacks.is_empty() } - pub fn add_callback(&mut self, id: K, subscription_id: usize, callback: F) { + pub fn subscribe(&mut self, key: K, subscription_id: usize) -> Subscription { + Subscription { + key, + id: subscription_id, + mapping: Some(Arc::downgrade(&self.internal)), + } + } + + pub fn count(&mut self, key: K) -> usize { self.internal .lock() - .entry(id) - .or_default() - .insert(subscription_id, Some(callback)); + .callbacks + .get(&key) + .map_or(0, |callbacks| callbacks.len()) } - pub fn remove(&mut self, id: K) { - self.internal.lock().remove(&id); + pub fn add_callback(&mut self, key: K, subscription_id: usize, callback: F) { + let mut this = self.internal.lock(); + if !this.dropped_subscriptions.contains(&(key, subscription_id)) { + this.callbacks + .entry(key) + .or_default() + .insert(subscription_id, callback); + } } - pub fn add_or_remove_callback(&mut self, id: K, subscription_id: usize, callback: F) { - match self - .internal - .lock() - .entry(id) - .or_default() - .entry(subscription_id) - { - btree_map::Entry::Vacant(entry) => { - entry.insert(Some(callback)); - } + pub fn remove(&mut self, key: K) { + self.internal.lock().callbacks.remove(&key); + } - btree_map::Entry::Occupied(entry) => { - // TODO: This seems like it should never be called because no code - // should ever attempt to remove an existing callback - debug_assert!(entry.get().is_none()); - entry.remove(); + pub fn emit bool>( + &mut self, + key: K, + cx: &mut MutableAppContext, + mut call_callback: C, + ) { + let callbacks = self.internal.lock().callbacks.remove(&key); + if let Some(callbacks) = callbacks { + for (subscription_id, mut callback) in callbacks { + if !self + .internal + .lock() + .dropped_subscriptions + .contains(&(key, subscription_id)) + { + if call_callback(&mut callback, cx) { + self.add_callback(key, subscription_id, callback); + } + } } } } - pub fn emit_and_cleanup bool>( - &mut self, - id: K, - cx: &mut MutableAppContext, - mut call_callback: C, - ) { - let callbacks = self.internal.lock().remove(&id); - if let Some(callbacks) = callbacks { - for (subscription_id, callback) in callbacks { - if let Some(mut callback) = callback { - let alive = call_callback(&mut callback, cx); - if alive { - match self - .internal - .lock() - .entry(id) - .or_default() - .entry(subscription_id) - { - btree_map::Entry::Vacant(entry) => { - entry.insert(Some(callback)); - } - btree_map::Entry::Occupied(entry) => { - entry.remove(); - } - } + pub fn gc(&mut self) { + let mut this = self.internal.lock(); + + for (key, id) in mem::take(&mut this.dropped_subscriptions) { + if let Some(callbacks) = this.callbacks.get_mut(&key) { + callbacks.remove(&id); + } + } + } +} + +impl Subscription { + pub fn detach(&mut self) { + self.mapping.take(); + } +} + +impl Drop for Subscription { + // If the callback has been initialized (no callback in the list for the key and id), + // add this subscription id and key to the dropped subscriptions list + // Otherwise, just remove the associated callback from the callback collection + fn drop(&mut self) { + if let Some(mapping) = self.mapping.as_ref().and_then(|mapping| mapping.upgrade()) { + let mut mapping = mapping.lock(); + if let Some(callbacks) = mapping.callbacks.get_mut(&self.key) { + match callbacks.entry(self.id) { + btree_map::Entry::Vacant(_) => { + mapping + .dropped_subscriptions + .insert((self.key.clone(), self.id)); + } + btree_map::Entry::Occupied(entry) => { + entry.remove(); + mapping + .dropped_subscriptions + .insert((self.key.clone(), self.id)); } } } From fa620bf98fc48cbe07e9a42c3cef95273c90c672 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Thu, 5 Jan 2023 17:30:39 -0800 Subject: [PATCH 2/9] Fix logic error in dropping callback subscriptions Co-authored-by: Kay Simmons --- crates/gpui/src/app/callback_collection.rs | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/crates/gpui/src/app/callback_collection.rs b/crates/gpui/src/app/callback_collection.rs index 4494e9073f..38a8dae26c 100644 --- a/crates/gpui/src/app/callback_collection.rs +++ b/crates/gpui/src/app/callback_collection.rs @@ -1,10 +1,9 @@ -use std::mem; use std::sync::Arc; use std::{hash::Hash, sync::Weak}; use parking_lot::Mutex; -use collections::{btree_map, BTreeMap, HashMap, HashSet}; +use collections::{BTreeMap, HashMap, HashSet}; use crate::MutableAppContext; @@ -142,7 +141,7 @@ impl CallbackCollection { pub fn gc(&mut self) { let mut this = self.internal.lock(); - for (key, id) in mem::take(&mut this.dropped_subscriptions) { + for (key, id) in std::mem::take(&mut this.dropped_subscriptions) { if let Some(callbacks) = this.callbacks.get_mut(&key) { callbacks.remove(&id); } @@ -164,20 +163,13 @@ impl Drop for Subscription { if let Some(mapping) = self.mapping.as_ref().and_then(|mapping| mapping.upgrade()) { let mut mapping = mapping.lock(); if let Some(callbacks) = mapping.callbacks.get_mut(&self.key) { - match callbacks.entry(self.id) { - btree_map::Entry::Vacant(_) => { - mapping - .dropped_subscriptions - .insert((self.key.clone(), self.id)); - } - btree_map::Entry::Occupied(entry) => { - entry.remove(); - mapping - .dropped_subscriptions - .insert((self.key.clone(), self.id)); - } + if callbacks.remove(&self.id).is_some() { + return; } } + mapping + .dropped_subscriptions + .insert((self.key.clone(), self.id)); } } } From 82e9f736bd9050a75129ca3cc0de33ba5a1b0894 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Thu, 5 Jan 2023 18:02:53 -0800 Subject: [PATCH 3/9] Use a CallbackCollection for release observations Co-authored-by: Kay Simmons --- crates/gpui/src/app.rs | 79 +++++++++++++++--------------------------- 1 file changed, 28 insertions(+), 51 deletions(-) diff --git a/crates/gpui/src/app.rs b/crates/gpui/src/app.rs index dce4e68e7c..fa01196d4b 100644 --- a/crates/gpui/src/app.rs +++ b/crates/gpui/src/app.rs @@ -588,9 +588,9 @@ type GlobalActionCallback = dyn FnMut(&dyn Action, &mut MutableAppContext); type SubscriptionCallback = Box bool>; type GlobalSubscriptionCallback = Box; type ObservationCallback = Box bool>; -type FocusObservationCallback = Box bool>; type GlobalObservationCallback = Box; -type ReleaseObservationCallback = Box; +type FocusObservationCallback = Box bool>; +type ReleaseObservationCallback = Box; type ActionObservationCallback = Box; type WindowActivationCallback = Box bool>; type WindowFullscreenCallback = Box bool>; @@ -615,18 +615,17 @@ pub struct MutableAppContext { next_subscription_id: usize, frame_count: usize, - focus_observations: CallbackCollection, - global_subscriptions: CallbackCollection, - global_observations: CallbackCollection, subscriptions: CallbackCollection, + global_subscriptions: CallbackCollection, observations: CallbackCollection, + global_observations: CallbackCollection, + focus_observations: CallbackCollection, + release_observations: CallbackCollection, + action_dispatch_observations: Arc>>, window_activation_observations: CallbackCollection, window_fullscreen_observations: CallbackCollection, keystroke_observations: CallbackCollection, - release_observations: Arc>>>, - action_dispatch_observations: Arc>>, - #[allow(clippy::type_complexity)] presenters_and_platform_windows: HashMap>, Box)>, @@ -1172,22 +1171,18 @@ impl MutableAppContext { F: 'static + FnOnce(&E, &mut Self), { let id = post_inc(&mut self.next_subscription_id); - self.release_observations - .lock() - .entry(handle.id()) - .or_default() - .insert( - id, - Box::new(move |released, cx| { - let released = released.downcast_ref().unwrap(); - callback(released, cx) - }), - ); - Subscription::ReleaseObservation { + let mut callback = Some(callback); + self.release_observations.add_callback( + handle.id(), id, - entity_id: handle.id(), - observations: Some(Arc::downgrade(&self.release_observations)), - } + Box::new(move |released, cx| { + let released = released.downcast_ref().unwrap(); + if let Some(callback) = callback.take() { + callback(released, cx) + } + }), + ); + Subscription::ReleaseObservation(self.release_observations.subscribe(handle.id(), id)) } pub fn observe_actions(&mut self, callback: F) -> Subscription @@ -2137,6 +2132,7 @@ impl MutableAppContext { self.window_activation_observations.gc(); self.window_fullscreen_observations.gc(); self.keystroke_observations.gc(); + self.release_observations.gc(); self.remove_dropped_entities(); @@ -2306,12 +2302,13 @@ impl MutableAppContext { } fn handle_entity_release_effect(&mut self, entity_id: usize, entity: &dyn Any) { - let callbacks = self.release_observations.lock().remove(&entity_id); - if let Some(callbacks) = callbacks { - for (_, callback) in callbacks { - callback(entity, self); - } - } + self.release_observations + .clone() + .emit(entity_id, self, |callback, this| { + callback(entity, this); + // Release observations happen one time. So clear the callback by returning false + false + }) } fn handle_fullscreen_effect(&mut self, window_id: usize, is_fullscreen: bool) { @@ -5093,14 +5090,8 @@ pub enum Subscription { WindowActivationObservation(callback_collection::Subscription), WindowFullscreenObservation(callback_collection::Subscription), KeystrokeObservation(callback_collection::Subscription), + ReleaseObservation(callback_collection::Subscription), - ReleaseObservation { - id: usize, - entity_id: usize, - #[allow(clippy::type_complexity)] - observations: - Option>>>>, - }, ActionObservation { id: usize, observations: Option>>>, @@ -5118,10 +5109,7 @@ impl Subscription { Subscription::KeystrokeObservation(subscription) => subscription.detach(), Subscription::WindowActivationObservation(subscription) => subscription.detach(), Subscription::WindowFullscreenObservation(subscription) => subscription.detach(), - - Subscription::ReleaseObservation { observations, .. } => { - observations.take(); - } + Subscription::ReleaseObservation(subscription) => subscription.detach(), Subscription::ActionObservation { observations, .. } => { observations.take(); } @@ -5132,17 +5120,6 @@ impl Subscription { impl Drop for Subscription { fn drop(&mut self) { match self { - Subscription::ReleaseObservation { - id, - entity_id, - observations, - } => { - if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) { - if let Some(observations) = observations.lock().get_mut(entity_id) { - observations.remove(id); - } - } - } Subscription::ActionObservation { id, observations } => { if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) { observations.lock().remove(id); From 3da69117ae3bea0f9fef7503b79d37bc295c12fb Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Fri, 6 Jan 2023 09:15:53 -0800 Subject: [PATCH 4/9] Use a CallbackCollection for action dispatch observations --- crates/gpui/src/app.rs | 66 ++++++++++------------ crates/gpui/src/app/callback_collection.rs | 51 ++++------------- 2 files changed, 41 insertions(+), 76 deletions(-) diff --git a/crates/gpui/src/app.rs b/crates/gpui/src/app.rs index fa01196d4b..aa1fd39015 100644 --- a/crates/gpui/src/app.rs +++ b/crates/gpui/src/app.rs @@ -27,7 +27,7 @@ use smol::prelude::*; pub use action::*; use callback_collection::CallbackCollection; -use collections::{hash_map::Entry, BTreeMap, HashMap, HashSet, VecDeque}; +use collections::{hash_map::Entry, HashMap, HashSet, VecDeque}; use keymap::MatchResult; use platform::Event; #[cfg(any(test, feature = "test-support"))] @@ -621,7 +621,7 @@ pub struct MutableAppContext { global_observations: CallbackCollection, focus_observations: CallbackCollection, release_observations: CallbackCollection, - action_dispatch_observations: Arc>>, + action_dispatch_observations: CallbackCollection<(), ActionObservationCallback>, window_activation_observations: CallbackCollection, window_fullscreen_observations: CallbackCollection, keystroke_observations: CallbackCollection, @@ -1189,14 +1189,13 @@ impl MutableAppContext { where F: 'static + FnMut(TypeId, &mut MutableAppContext), { - let id = post_inc(&mut self.next_subscription_id); + let subscription_id = post_inc(&mut self.next_subscription_id); self.action_dispatch_observations - .lock() - .insert(id, Box::new(callback)); - Subscription::ActionObservation { - id, - observations: Some(Arc::downgrade(&self.action_dispatch_observations)), - } + .add_callback((), subscription_id, Box::new(callback)); + Subscription::ActionObservation( + self.action_dispatch_observations + .subscribe((), subscription_id), + ) } fn observe_window_activation(&mut self, window_id: usize, callback: F) -> Subscription @@ -2489,11 +2488,12 @@ impl MutableAppContext { } fn handle_action_dispatch_notification_effect(&mut self, action_id: TypeId) { - let mut callbacks = mem::take(&mut *self.action_dispatch_observations.lock()); - for callback in callbacks.values_mut() { - callback(action_id, self); - } - self.action_dispatch_observations.lock().extend(callbacks); + self.action_dispatch_observations + .clone() + .emit((), self, |callback, this| { + callback(action_id, this); + true + }); } fn handle_window_should_close_subscription_effect( @@ -5091,14 +5091,25 @@ pub enum Subscription { WindowFullscreenObservation(callback_collection::Subscription), KeystrokeObservation(callback_collection::Subscription), ReleaseObservation(callback_collection::Subscription), - - ActionObservation { - id: usize, - observations: Option>>>, - }, + ActionObservation(callback_collection::Subscription<(), ActionObservationCallback>), } impl Subscription { + pub fn id(&self) -> usize { + match self { + Subscription::Subscription(subscription) => subscription.id(), + Subscription::Observation(subscription) => subscription.id(), + Subscription::GlobalSubscription(subscription) => subscription.id(), + Subscription::GlobalObservation(subscription) => subscription.id(), + Subscription::FocusObservation(subscription) => subscription.id(), + Subscription::WindowActivationObservation(subscription) => subscription.id(), + Subscription::WindowFullscreenObservation(subscription) => subscription.id(), + Subscription::KeystrokeObservation(subscription) => subscription.id(), + Subscription::ReleaseObservation(subscription) => subscription.id(), + Subscription::ActionObservation(subscription) => subscription.id(), + } + } + pub fn detach(&mut self) { match self { Subscription::Subscription(subscription) => subscription.detach(), @@ -5110,22 +5121,7 @@ impl Subscription { Subscription::WindowActivationObservation(subscription) => subscription.detach(), Subscription::WindowFullscreenObservation(subscription) => subscription.detach(), Subscription::ReleaseObservation(subscription) => subscription.detach(), - Subscription::ActionObservation { observations, .. } => { - observations.take(); - } - } - } -} - -impl Drop for Subscription { - fn drop(&mut self) { - match self { - Subscription::ActionObservation { id, observations } => { - if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) { - observations.lock().remove(id); - } - } - _ => {} + Subscription::ActionObservation(subscription) => subscription.detach(), } } } diff --git a/crates/gpui/src/app/callback_collection.rs b/crates/gpui/src/app/callback_collection.rs index 38a8dae26c..5bed9f7a29 100644 --- a/crates/gpui/src/app/callback_collection.rs +++ b/crates/gpui/src/app/callback_collection.rs @@ -1,44 +1,13 @@ +use crate::MutableAppContext; +use collections::{BTreeMap, HashMap, HashSet}; +use parking_lot::Mutex; use std::sync::Arc; use std::{hash::Hash, sync::Weak}; -use parking_lot::Mutex; +pub struct CallbackCollection { + internal: Arc>>, +} -use collections::{BTreeMap, HashMap, HashSet}; - -use crate::MutableAppContext; - -// Problem 5: Current bug callbacks currently called many times after being dropped -// update -// notify -// observe (push effect) -// subscription {id : 5} -// pending: [Effect::Notify, Effect::observe { id: 5 }] -// drop observation subscription (write None into subscriptions) -// flush effects -// notify -// observe -// Problem 6: Key-value pair is leaked if you drop a callback while calling it, and then never call that set of callbacks again -// ----------------- -// Problem 1: Many very similar subscription enum variants -// Problem 2: Subscriptions and CallbackCollections use a shared mutex to update the callback status -// Problem 3: Current implementation is error prone with regard to uninitialized callbacks or dropping during callback -// Problem 4: Calling callbacks requires removing all of them from the list and adding them back - -// Solution 1 CallbackState: -// Add more state to the CallbackCollection map to communicate dropped and initialized status -// Solves: P5 -// Solution 2 DroppedSubscriptionList: -// Store a parallel set of dropped subscriptions in the Mapping which stores the key and subscription id for all dropped subscriptions -// which can be -// Solution 3 GarbageCollection: -// Use some type of traditional garbage collection to handle dropping of callbacks -// atomic flag per callback which is looped over in remove dropped entities - -// TODO: -// - Move subscription id counter to CallbackCollection -// - Consider adding a reverse map in Mapping from subscription id to key so that the dropped subscriptions -// can be a hashset of usize and the Subscription doesn't need the key -// - Investigate why the remaining two types of callback lists can't use the same callback collection and subscriptions pub struct Subscription { key: K, id: usize, @@ -59,10 +28,6 @@ impl Default for Mapping { } } -pub(crate) struct CallbackCollection { - internal: Arc>>, -} - impl Clone for CallbackCollection { fn clone(&self) -> Self { Self { @@ -150,6 +115,10 @@ impl CallbackCollection { } impl Subscription { + pub fn id(&self) -> usize { + self.id + } + pub fn detach(&mut self) { self.mapping.take(); } From a165cd596bdc8b71a47130b9b8281b1f30659254 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Fri, 6 Jan 2023 09:56:29 -0800 Subject: [PATCH 5/9] Make event tests in gpui more consistent --- crates/gpui/src/app.rs | 254 +++++++++++------------------------------ 1 file changed, 68 insertions(+), 186 deletions(-) diff --git a/crates/gpui/src/app.rs b/crates/gpui/src/app.rs index aa1fd39015..3299fbb746 100644 --- a/crates/gpui/src/app.rs +++ b/crates/gpui/src/app.rs @@ -5768,60 +5768,44 @@ mod tests { #[crate::test(self)] fn test_view_events(cx: &mut MutableAppContext) { - #[derive(Default)] - struct View { - events: Vec, - } - - impl Entity for View { - type Event = usize; - } - - impl super::View for View { - fn render(&mut self, _: &mut RenderContext) -> ElementBox { - Empty::new().boxed() - } - - fn ui_name() -> &'static str { - "View" - } - } - struct Model; impl Entity for Model { - type Event = usize; + type Event = String; } - let (_, handle_1) = cx.add_window(Default::default(), |_| View::default()); - let handle_2 = cx.add_view(&handle_1, |_| View::default()); + let (_, handle_1) = cx.add_window(Default::default(), |_| TestView::default()); + let handle_2 = cx.add_view(&handle_1, |_| TestView::default()); let handle_3 = cx.add_model(|_| Model); handle_1.update(cx, |_, cx| { cx.subscribe(&handle_2, move |me, emitter, event, cx| { - me.events.push(*event); + me.events.push(event.clone()); cx.subscribe(&emitter, |me, _, event, _| { - me.events.push(*event * 2); + me.events.push(format!("{event} from inner")); }) .detach(); }) .detach(); cx.subscribe(&handle_3, |me, _, event, _| { - me.events.push(*event); + me.events.push(event.clone()); }) .detach(); }); - handle_2.update(cx, |_, c| c.emit(7)); - assert_eq!(handle_1.read(cx).events, vec![7]); + handle_2.update(cx, |_, c| c.emit("7".into())); + assert_eq!(handle_1.read(cx).events, vec!["7"]); - handle_2.update(cx, |_, c| c.emit(5)); - assert_eq!(handle_1.read(cx).events, vec![7, 5, 10]); + handle_2.update(cx, |_, c| c.emit("5".into())); + assert_eq!(handle_1.read(cx).events, vec!["7", "5", "5 from inner"]); - handle_3.update(cx, |_, c| c.emit(9)); - assert_eq!(handle_1.read(cx).events, vec![7, 5, 10, 9]); + handle_3.update(cx, |_, c| c.emit("9".into())); + assert_eq!( + handle_1.read(cx).events, + vec!["7", "5", "5 from inner", "9"] + ); } #[crate::test(self)] @@ -6012,31 +5996,15 @@ mod tests { #[crate::test(self)] fn test_dropping_subscribers(cx: &mut MutableAppContext) { - struct View; - - impl Entity for View { - type Event = (); - } - - impl super::View for View { - fn render(&mut self, _: &mut RenderContext) -> ElementBox { - Empty::new().boxed() - } - - fn ui_name() -> &'static str { - "View" - } - } - struct Model; impl Entity for Model { type Event = (); } - let (_, root_view) = cx.add_window(Default::default(), |_| View); - let observing_view = cx.add_view(&root_view, |_| View); - let emitting_view = cx.add_view(&root_view, |_| View); + let (_, root_view) = cx.add_window(Default::default(), |_| TestView::default()); + let observing_view = cx.add_view(&root_view, |_| TestView::default()); + let emitting_view = cx.add_view(&root_view, |_| TestView::default()); let observing_model = cx.add_model(|_| Model); let observed_model = cx.add_model(|_| Model); @@ -6053,165 +6021,92 @@ mod tests { drop(observing_model); }); - emitting_view.update(cx, |_, cx| cx.emit(())); + emitting_view.update(cx, |_, cx| cx.emit(Default::default())); observed_model.update(cx, |_, cx| cx.emit(())); } #[crate::test(self)] fn test_view_emit_before_subscribe_in_same_update_cycle(cx: &mut MutableAppContext) { - #[derive(Default)] - struct TestView; - - impl Entity for TestView { - type Event = (); - } - - impl View for TestView { - fn ui_name() -> &'static str { - "TestView" - } - - fn render(&mut self, _: &mut RenderContext) -> ElementBox { - Empty::new().boxed() - } - } - - let events = Rc::new(RefCell::new(Vec::new())); - cx.add_window(Default::default(), |cx| { + let (_, view) = cx.add_window::(Default::default(), |cx| { drop(cx.subscribe(&cx.handle(), { - let events = events.clone(); - move |_, _, _, _| events.borrow_mut().push("dropped before flush") + move |this, _, _, _| this.events.push("dropped before flush".into()) })); cx.subscribe(&cx.handle(), { - let events = events.clone(); - move |_, _, _, _| events.borrow_mut().push("before emit") + move |this, _, _, _| this.events.push("before emit".into()) }) .detach(); - cx.emit(()); + cx.emit("the event".into()); cx.subscribe(&cx.handle(), { - let events = events.clone(); - move |_, _, _, _| events.borrow_mut().push("after emit") + move |this, _, _, _| this.events.push("after emit".into()) }) .detach(); - TestView + TestView { events: Vec::new() } }); - assert_eq!(*events.borrow(), ["before emit"]); + + assert_eq!(view.read(cx).events, ["before emit"]); } #[crate::test(self)] fn test_observe_and_notify_from_view(cx: &mut MutableAppContext) { - #[derive(Default)] - struct View { - events: Vec, - } - - impl Entity for View { - type Event = usize; - } - - impl super::View for View { - fn render(&mut self, _: &mut RenderContext) -> ElementBox { - Empty::new().boxed() - } - - fn ui_name() -> &'static str { - "View" - } - } - #[derive(Default)] struct Model { - count: usize, + state: String, } impl Entity for Model { type Event = (); } - let (_, view) = cx.add_window(Default::default(), |_| View::default()); - let model = cx.add_model(|_| Model::default()); + let (_, view) = cx.add_window(Default::default(), |_| TestView::default()); + let model = cx.add_model(|_| Model { + state: "old-state".into(), + }); view.update(cx, |_, c| { c.observe(&model, |me, observed, c| { - me.events.push(observed.read(c).count) + me.events.push(observed.read(c).state.clone()) }) .detach(); }); - model.update(cx, |model, c| { - model.count = 11; - c.notify(); + model.update(cx, |model, cx| { + model.state = "new-state".into(); + cx.notify(); }); - assert_eq!(view.read(cx).events, vec![11]); + assert_eq!(view.read(cx).events, vec!["new-state"]); } #[crate::test(self)] fn test_view_notify_before_observe_in_same_update_cycle(cx: &mut MutableAppContext) { - #[derive(Default)] - struct TestView; - - impl Entity for TestView { - type Event = (); - } - - impl View for TestView { - fn ui_name() -> &'static str { - "TestView" - } - - fn render(&mut self, _: &mut RenderContext) -> ElementBox { - Empty::new().boxed() - } - } - - let events = Rc::new(RefCell::new(Vec::new())); - cx.add_window(Default::default(), |cx| { + let (_, view) = cx.add_window::(Default::default(), |cx| { drop(cx.observe(&cx.handle(), { - let events = events.clone(); - move |_, _, _| events.borrow_mut().push("dropped before flush") + move |this, _, _| this.events.push("dropped before flush".into()) })); cx.observe(&cx.handle(), { - let events = events.clone(); - move |_, _, _| events.borrow_mut().push("before notify") + move |this, _, _| this.events.push("before notify".into()) }) .detach(); cx.notify(); cx.observe(&cx.handle(), { - let events = events.clone(); - move |_, _, _| events.borrow_mut().push("after notify") + move |this, _, _| this.events.push("after notify".into()) }) .detach(); - TestView + TestView { events: Vec::new() } }); - assert_eq!(*events.borrow(), ["before notify"]); + + assert_eq!(view.read(cx).events, ["before notify"]); } #[crate::test(self)] fn test_dropping_observers(cx: &mut MutableAppContext) { - struct View; - - impl Entity for View { - type Event = (); - } - - impl super::View for View { - fn render(&mut self, _: &mut RenderContext) -> ElementBox { - Empty::new().boxed() - } - - fn ui_name() -> &'static str { - "View" - } - } - struct Model; impl Entity for Model { type Event = (); } - let (_, root_view) = cx.add_window(Default::default(), |_| View); - let observing_view = cx.add_view(root_view, |_| View); + let (_, root_view) = cx.add_window(Default::default(), |_| TestView::default()); + let observing_view = cx.add_view(root_view, |_| TestView::default()); let observing_model = cx.add_model(|_| Model); let observed_model = cx.add_model(|_| Model); @@ -6927,47 +6822,15 @@ mod tests { #[crate::test(self)] #[should_panic] async fn test_view_condition_timeout(cx: &mut TestAppContext) { - struct View; - - impl super::Entity for View { - type Event = (); - } - - impl super::View for View { - fn ui_name() -> &'static str { - "test view" - } - - fn render(&mut self, _: &mut RenderContext) -> ElementBox { - Empty::new().boxed() - } - } - - let (_, view) = cx.add_window(|_| View); + let (_, view) = cx.add_window(|_| TestView::default()); view.condition(cx, |_, _| false).await; } #[crate::test(self)] #[should_panic(expected = "view dropped with pending condition")] async fn test_view_condition_panic_on_drop(cx: &mut TestAppContext) { - struct View; - - impl super::Entity for View { - type Event = (); - } - - impl super::View for View { - fn ui_name() -> &'static str { - "test view" - } - - fn render(&mut self, _: &mut RenderContext) -> ElementBox { - Empty::new().boxed() - } - } - - let (_, root_view) = cx.add_window(|_| View); - let view = cx.add_view(&root_view, |_| View); + let (_, root_view) = cx.add_window(|_| TestView::default()); + let view = cx.add_view(&root_view, |_| TestView::default()); let condition = view.condition(cx, |_, _| false); cx.update(|_| drop(view)); @@ -7180,4 +7043,23 @@ mod tests { assert!(!child_rendered.take()); assert!(child_dropped.take()); } + + #[derive(Default)] + struct TestView { + events: Vec, + } + + impl Entity for TestView { + type Event = String; + } + + impl View for TestView { + fn ui_name() -> &'static str { + "TestView" + } + + fn render(&mut self, _: &mut RenderContext) -> ElementBox { + Empty::new().boxed() + } + } } From 4708f5d88f85e6821c62deea2bfd3cfc1e0f37eb Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Fri, 6 Jan 2023 10:44:37 -0800 Subject: [PATCH 6/9] Add test for notifying and dropping subscriptions in an update cycle --- crates/gpui/src/app.rs | 25 ++++++++++++++++++++++ crates/gpui/src/app/callback_collection.rs | 12 +++-------- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/crates/gpui/src/app.rs b/crates/gpui/src/app.rs index 3299fbb746..4a5b6f54be 100644 --- a/crates/gpui/src/app.rs +++ b/crates/gpui/src/app.rs @@ -6097,6 +6097,31 @@ mod tests { assert_eq!(view.read(cx).events, ["before notify"]); } + #[crate::test(self)] + fn test_notify_and_drop_observe_subscription_in_same_update_cycle(cx: &mut MutableAppContext) { + struct Model; + impl Entity for Model { + type Event = (); + } + + let model = cx.add_model(|_| Model); + let (_, view) = cx.add_window(Default::default(), |_| TestView::default()); + + view.update(cx, |_, cx| { + model.update(cx, |_, cx| cx.notify()); + drop(cx.observe(&model, move |this, _, _| { + this.events.push("model notified".into()); + })); + model.update(cx, |_, cx| cx.notify()); + }); + + for _ in 0..3 { + model.update(cx, |_, cx| cx.notify()); + } + + assert_eq!(view.read(cx).events, Vec::::new()); + } + #[crate::test(self)] fn test_dropping_observers(cx: &mut MutableAppContext) { struct Model; diff --git a/crates/gpui/src/app/callback_collection.rs b/crates/gpui/src/app/callback_collection.rs index 5bed9f7a29..43f4f3f62e 100644 --- a/crates/gpui/src/app/callback_collection.rs +++ b/crates/gpui/src/app/callback_collection.rs @@ -58,14 +58,6 @@ impl CallbackCollection { } } - pub fn count(&mut self, key: K) -> usize { - self.internal - .lock() - .callbacks - .get(&key) - .map_or(0, |callbacks| callbacks.len()) - } - pub fn add_callback(&mut self, key: K, subscription_id: usize, callback: F) { let mut this = self.internal.lock(); if !this.dropped_subscriptions.contains(&(key, subscription_id)) { @@ -77,7 +69,9 @@ impl CallbackCollection { } pub fn remove(&mut self, key: K) { - self.internal.lock().callbacks.remove(&key); + let callbacks = self.internal.lock().callbacks.remove(&key); + // drop these after releasing the lock + drop(callbacks); } pub fn emit bool>( From ef192a902a030ee2a4a15c054f23563dc8e91ee6 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Fri, 6 Jan 2023 11:03:45 -0800 Subject: [PATCH 7/9] Remove dropped subscription eagerly when removing callbacks --- crates/gpui/src/app/callback_collection.rs | 48 +++++++++++++++------- 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/crates/gpui/src/app/callback_collection.rs b/crates/gpui/src/app/callback_collection.rs index 43f4f3f62e..44db224967 100644 --- a/crates/gpui/src/app/callback_collection.rs +++ b/crates/gpui/src/app/callback_collection.rs @@ -60,17 +60,24 @@ impl CallbackCollection { pub fn add_callback(&mut self, key: K, subscription_id: usize, callback: F) { let mut this = self.internal.lock(); - if !this.dropped_subscriptions.contains(&(key, subscription_id)) { - this.callbacks - .entry(key) - .or_default() - .insert(subscription_id, callback); + + // If this callback's subscription was dropped before the callback was + // added, then just drop the callback. + if this.dropped_subscriptions.remove(&(key, subscription_id)) { + return; } + + this.callbacks + .entry(key) + .or_default() + .insert(subscription_id, callback); } pub fn remove(&mut self, key: K) { let callbacks = self.internal.lock().callbacks.remove(&key); - // drop these after releasing the lock + + // Drop these callbacks after releasing the lock, in case one of them + // owns a subscription to this callback collection. drop(callbacks); } @@ -83,15 +90,19 @@ impl CallbackCollection { let callbacks = self.internal.lock().callbacks.remove(&key); if let Some(callbacks) = callbacks { for (subscription_id, mut callback) in callbacks { - if !self + // If this callback's subscription was dropped while invoking an + // earlier callback, then just drop this callback. + if self .internal .lock() .dropped_subscriptions - .contains(&(key, subscription_id)) + .remove(&(key, subscription_id)) { - if call_callback(&mut callback, cx) { - self.add_callback(key, subscription_id, callback); - } + continue; + } + + if call_callback(&mut callback, cx) { + self.add_callback(key, subscription_id, callback); } } } @@ -119,17 +130,24 @@ impl Subscription { } impl Drop for Subscription { - // If the callback has been initialized (no callback in the list for the key and id), - // add this subscription id and key to the dropped subscriptions list - // Otherwise, just remove the associated callback from the callback collection fn drop(&mut self) { if let Some(mapping) = self.mapping.as_ref().and_then(|mapping| mapping.upgrade()) { let mut mapping = mapping.lock(); + + // If the callback is present in the mapping, then just remove it. if let Some(callbacks) = mapping.callbacks.get_mut(&self.key) { - if callbacks.remove(&self.id).is_some() { + let callback = callbacks.remove(&self.id); + if callback.is_some() { + drop(mapping); + drop(callback); return; } } + + // If this subscription's callback is not present, then either it has been + // temporarily removed during emit, or it has not yet been added. Record + // that this subscription has been dropped so that the callback can be + // removed later. mapping .dropped_subscriptions .insert((self.key.clone(), self.id)); From 53cb3a4429cbce6417fbf1dc07c87f108f8453f2 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Fri, 6 Jan 2023 11:33:50 -0800 Subject: [PATCH 8/9] Remove GC step for callback collections, always drop callbacks asap --- crates/gpui/src/app.rs | 10 ---- crates/gpui/src/app/callback_collection.rs | 65 ++++++++++++++-------- 2 files changed, 42 insertions(+), 33 deletions(-) diff --git a/crates/gpui/src/app.rs b/crates/gpui/src/app.rs index 4a5b6f54be..5568155cf7 100644 --- a/crates/gpui/src/app.rs +++ b/crates/gpui/src/app.rs @@ -2123,16 +2123,6 @@ impl MutableAppContext { self.pending_notifications.clear(); self.remove_dropped_entities(); } else { - self.focus_observations.gc(); - self.global_subscriptions.gc(); - self.global_observations.gc(); - self.subscriptions.gc(); - self.observations.gc(); - self.window_activation_observations.gc(); - self.window_fullscreen_observations.gc(); - self.keystroke_observations.gc(); - self.release_observations.gc(); - self.remove_dropped_entities(); if refreshing { diff --git a/crates/gpui/src/app/callback_collection.rs b/crates/gpui/src/app/callback_collection.rs index 44db224967..45479eabad 100644 --- a/crates/gpui/src/app/callback_collection.rs +++ b/crates/gpui/src/app/callback_collection.rs @@ -16,7 +16,17 @@ pub struct Subscription { struct Mapping { callbacks: HashMap>, - dropped_subscriptions: HashSet<(K, usize)>, + dropped_subscriptions: HashMap>, +} + +impl Mapping { + fn clear_dropped_state(&mut self, key: &K, subscription_id: usize) -> bool { + if let Some(subscriptions) = self.dropped_subscriptions.get_mut(&key) { + subscriptions.remove(&subscription_id) + } else { + false + } + } } impl Default for Mapping { @@ -50,6 +60,14 @@ impl CallbackCollection { self.internal.lock().callbacks.is_empty() } + pub fn count(&self, key: K) -> usize { + self.internal + .lock() + .callbacks + .get(&key) + .map_or(0, |callbacks| callbacks.len()) + } + pub fn subscribe(&mut self, key: K, subscription_id: usize) -> Subscription { Subscription { key, @@ -63,7 +81,7 @@ impl CallbackCollection { // If this callback's subscription was dropped before the callback was // added, then just drop the callback. - if this.dropped_subscriptions.remove(&(key, subscription_id)) { + if this.clear_dropped_state(&key, subscription_id) { return; } @@ -74,10 +92,12 @@ impl CallbackCollection { } pub fn remove(&mut self, key: K) { - let callbacks = self.internal.lock().callbacks.remove(&key); - // Drop these callbacks after releasing the lock, in case one of them // owns a subscription to this callback collection. + let mut this = self.internal.lock(); + let callbacks = this.callbacks.remove(&key); + this.dropped_subscriptions.remove(&key); + drop(this); drop(callbacks); } @@ -91,29 +111,26 @@ impl CallbackCollection { if let Some(callbacks) = callbacks { for (subscription_id, mut callback) in callbacks { // If this callback's subscription was dropped while invoking an - // earlier callback, then just drop this callback. - if self - .internal - .lock() - .dropped_subscriptions - .remove(&(key, subscription_id)) - { + // earlier callback, then just drop the callback. + let mut this = self.internal.lock(); + if this.clear_dropped_state(&key, subscription_id) { continue; } - if call_callback(&mut callback, cx) { - self.add_callback(key, subscription_id, callback); + drop(this); + let alive = call_callback(&mut callback, cx); + + // If this callback's subscription was dropped while invoking the callback + // itself, or if the callback returns false, then just drop the callback. + let mut this = self.internal.lock(); + if this.clear_dropped_state(&key, subscription_id) || !alive { + continue; } - } - } - } - pub fn gc(&mut self) { - let mut this = self.internal.lock(); - - for (key, id) in std::mem::take(&mut this.dropped_subscriptions) { - if let Some(callbacks) = this.callbacks.get_mut(&key) { - callbacks.remove(&id); + this.callbacks + .entry(key) + .or_default() + .insert(subscription_id, callback); } } } @@ -150,7 +167,9 @@ impl Drop for Subscription { // removed later. mapping .dropped_subscriptions - .insert((self.key.clone(), self.id)); + .entry(self.key.clone()) + .or_default() + .insert(self.id); } } } From b762d70202ffb71b053b4f83b085d8db0b710da1 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Fri, 6 Jan 2023 11:51:36 -0800 Subject: [PATCH 9/9] Remove unused CallbackCollection method --- crates/gpui/src/app/callback_collection.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/crates/gpui/src/app/callback_collection.rs b/crates/gpui/src/app/callback_collection.rs index 45479eabad..7496d71501 100644 --- a/crates/gpui/src/app/callback_collection.rs +++ b/crates/gpui/src/app/callback_collection.rs @@ -60,14 +60,6 @@ impl CallbackCollection { self.internal.lock().callbacks.is_empty() } - pub fn count(&self, key: K) -> usize { - self.internal - .lock() - .callbacks - .get(&key) - .map_or(0, |callbacks| callbacks.len()) - } - pub fn subscribe(&mut self, key: K, subscription_id: usize) -> Subscription { Subscription { key,