From 08d53cae93246f598e4c26de388d45d3ba460fd8 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Tue, 3 Sep 2024 14:04:43 +0800 Subject: [PATCH] feat: subscribe for local updates (#444) --- .vscode/settings.json | 3 +- crates/loro-internal/src/encoding.rs | 24 +- .../src/encoding/fast_snapshot.rs | 18 +- crates/loro-internal/src/lib.rs | 1 + crates/loro-internal/src/loro.rs | 58 ++- crates/loro-internal/src/obs.rs | 1 + crates/loro-internal/src/oplog.rs | 5 + .../loro-internal/src/oplog/change_store.rs | 27 ++ crates/loro-internal/src/txn.rs | 12 +- crates/loro-internal/src/utils/mod.rs | 1 + .../loro-internal/src/utils/subscription.rs | 427 ++++++++++++++++++ crates/loro-wasm/src/lib.rs | 64 +++ crates/loro/src/lib.rs | 17 +- crates/loro/tests/loro_rust_test.rs | 61 ++- loro-js/tests/event.test.ts | 118 +++++ 15 files changed, 797 insertions(+), 40 deletions(-) create mode 100644 crates/loro-internal/src/utils/subscription.rs diff --git a/.vscode/settings.json b/.vscode/settings.json index c70ac2c3..1e8c342e 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -10,6 +10,7 @@ "flate", "fuzzer", "gmax", + "GPUI", "heapless", "Helloii", "Hoexllo", @@ -50,7 +51,7 @@ "DEBUG": "*" }, "rust-analyzer.cargo.features": [ - "test_utils", + // "test_utils", // "counter" ], "editor.defaultFormatter": "rust-lang.rust-analyzer", diff --git a/crates/loro-internal/src/encoding.rs b/crates/loro-internal/src/encoding.rs index 3a01585d..e432b22b 100644 --- a/crates/loro-internal/src/encoding.rs +++ b/crates/loro-internal/src/encoding.rs @@ -13,16 +13,18 @@ use crate::op::OpWithId; use crate::version::Frontiers; use crate::LoroDoc; use crate::{oplog::OpLog, LoroError, VersionVector}; -use loro_common::{IdLpSpan, LoroResult, PeerID}; +use loro_common::{IdLpSpan, IdSpan, LoroResult, PeerID}; use num_traits::{FromPrimitive, ToPrimitive}; use rle::{HasLength, Sliceable}; use serde::{Deserialize, Serialize}; pub(crate) use value::OwnedValue; #[non_exhaustive] +#[derive(Debug, Clone, Copy)] pub enum ExportMode<'a> { Snapshot, - Updates(&'a VersionVector), + Updates { from: &'a VersionVector }, + UpdatesInRange { spans: &'a [IdSpan] }, GcSnapshot(&'a Frontiers), } @@ -301,6 +303,24 @@ pub(crate) fn export_fast_updates(doc: &LoroDoc, vv: &VersionVector) -> Vec ans } +pub(crate) fn export_fast_updates_in_range(oplog: &OpLog, spans: &[IdSpan]) -> Vec { + // HEADER + let mut ans = Vec::with_capacity(MIN_HEADER_SIZE); + ans.extend(MAGIC_BYTES); + let checksum = [0; 16]; + ans.extend(checksum); + ans.extend(EncodeMode::FastUpdates.to_bytes()); + + // BODY + fast_snapshot::encode_updates_in_range(oplog, spans, &mut ans); + + // CHECKSUM in HEADER + let checksum_body = &ans[20..]; + let checksum = xxhash_rust::xxh32::xxh32(checksum_body, XXH_SEED); + ans[16..20].copy_from_slice(&checksum.to_le_bytes()); + ans +} + pub(crate) fn export_gc_snapshot(doc: &LoroDoc, f: &Frontiers) -> Vec { // HEADER let mut ans = Vec::with_capacity(MIN_HEADER_SIZE); diff --git a/crates/loro-internal/src/encoding/fast_snapshot.rs b/crates/loro-internal/src/encoding/fast_snapshot.rs index 29494ee9..1b35bd6b 100644 --- a/crates/loro-internal/src/encoding/fast_snapshot.rs +++ b/crates/loro-internal/src/encoding/fast_snapshot.rs @@ -17,7 +17,7 @@ use std::io::{Read, Write}; use crate::{oplog::ChangeStore, LoroDoc, OpLog, VersionVector}; use bytes::{Buf, Bytes}; -use loro_common::{LoroError, LoroResult}; +use loro_common::{IdSpan, LoroError, LoroResult}; use super::encode_reordered::import_changes_to_oplog; @@ -159,6 +159,22 @@ pub(crate) fn encode_updates(doc: &LoroDoc, vv: &VersionVecto ); } +pub(crate) fn encode_updates_in_range( + oplog: &OpLog, + spans: &[IdSpan], + w: &mut W, +) { + let bytes = oplog.export_from_fast_in_range(spans); + _encode_snapshot( + Snapshot { + oplog_bytes: bytes, + state_bytes: Bytes::new(), + gc_bytes: Bytes::new(), + }, + w, + ); +} + pub(crate) fn decode_oplog(oplog: &mut OpLog, bytes: &[u8]) -> Result<(), LoroError> { let oplog_len = u32::from_le_bytes(bytes[0..4].try_into().unwrap()); let oplog_bytes = &bytes[4..4 + oplog_len as usize]; diff --git a/crates/loro-internal/src/lib.rs b/crates/loro-internal/src/lib.rs index 7fd86641..bb9a5b45 100644 --- a/crates/loro-internal/src/lib.rs +++ b/crates/loro-internal/src/lib.rs @@ -22,6 +22,7 @@ pub use loro_common; pub use oplog::OpLog; pub use state::DocState; pub use undo::UndoManager; +pub use utils::subscription::Subscription; pub mod awareness; pub mod cursor; mod kv_store; diff --git a/crates/loro-internal/src/loro.rs b/crates/loro-internal/src/loro.rs index 5e264250..082ede39 100644 --- a/crates/loro-internal/src/loro.rs +++ b/crates/loro-internal/src/loro.rs @@ -28,19 +28,20 @@ use crate::{ dag::DagUtils, diff_calc::DiffCalculator, encoding::{ - decode_snapshot, export_fast_snapshot, export_fast_updates, export_gc_snapshot, - export_snapshot, json_schema::json::JsonSchema, parse_header_and_body, EncodeMode, - ParsedHeaderAndBody, + decode_snapshot, export_fast_snapshot, export_fast_updates, export_fast_updates_in_range, + export_gc_snapshot, export_snapshot, json_schema::json::JsonSchema, parse_header_and_body, + EncodeMode, ParsedHeaderAndBody, }, event::{str_to_path, EventTriggerKind, Index, InternalDocDiff}, handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler}, id::PeerID, - obs::{Observer, SubID, Subscriber}, + obs::{LocalUpdateCallback, Observer, SubID, Subscriber}, op::InnerContent, oplog::{loro_dag::FrontiersNotIncluded, OpLog}, state::DocState, txn::Transaction, undo::DiffBatch, + utils::subscription::{SubscriberSet, Subscription}, version::{Frontiers, ImVersionVector}, HandlerTrait, InternalString, ListHandler, LoroError, MapHandler, VersionVector, }; @@ -71,6 +72,7 @@ pub struct LoroDoc { arena: SharedArena, config: Configure, observer: Arc, + local_update_subs: SubscriberSet<(), LocalUpdateCallback>, diff_calculator: Arc>, // when dropping the doc, the txn will be committed txn: Arc>>, @@ -108,6 +110,7 @@ impl LoroDoc { config, detached: AtomicBool::new(false), auto_commit: AtomicBool::new(false), + local_update_subs: SubscriberSet::new(), observer: Arc::new(Observer::new(arena.clone())), diff_calculator: Arc::new(Mutex::new(DiffCalculator::new(true))), txn: global_txn, @@ -136,6 +139,7 @@ impl LoroDoc { arena, config, observer: Arc::new(Observer::new(self.arena.clone())), + local_update_subs: SubscriberSet::new(), diff_calculator: Arc::new(Mutex::new(DiffCalculator::new(true))), txn, auto_commit: AtomicBool::new(false), @@ -222,22 +226,6 @@ impl LoroDoc { self.detached.load(Acquire) } - #[allow(unused)] - pub(super) fn from_existing(oplog: OpLog, state: DocState) -> Self { - let obs = Observer::new(oplog.arena.clone()); - Self { - arena: oplog.arena.clone(), - observer: Arc::new(obs), - config: Default::default(), - auto_commit: AtomicBool::new(false), - oplog: Arc::new(Mutex::new(oplog)), - state: Arc::new(Mutex::new(state)), - diff_calculator: Arc::new(Mutex::new(DiffCalculator::new(true))), - txn: Arc::new(Mutex::new(None)), - detached: AtomicBool::new(false), - } - } - #[inline(always)] pub fn peer_id(&self) -> PeerID { self.state @@ -377,6 +365,7 @@ impl LoroDoc { txn.set_msg(Some(msg.clone())); } + let id_span = txn.id_span(); txn.commit().unwrap(); if config.immediate_renew { let mut txn_guard = self.txn.try_lock().unwrap(); @@ -385,7 +374,7 @@ impl LoroDoc { } if let Some(on_commit) = on_commit { - on_commit(&self.state); + on_commit(&self.state, &self.oplog, id_span); } } @@ -440,13 +429,27 @@ impl LoroDoc { ); let obs = self.observer.clone(); - txn.set_on_commit(Box::new(move |state| { + let local_update_subs = self.local_update_subs.clone(); + txn.set_on_commit(Box::new(move |state, oplog, id_span| { let mut state = state.try_lock().unwrap(); let events = state.take_events(); drop(state); for event in events { obs.emit(event); } + + if !local_update_subs.is_empty() { + trace!("exporting fast updates"); + let bytes = + { export_fast_updates_in_range(&oplog.try_lock().unwrap(), &[id_span]) }; + trace!("exported fast updates"); + local_update_subs.retain(&(), |callback| { + trace!("calling callback"); + callback(&bytes); + trace!("called callback"); + true + }); + } })); Ok(txn) @@ -1057,6 +1060,12 @@ impl LoroDoc { self.observer.unsubscribe(id); } + pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription { + let (sub, activate) = self.local_update_subs.insert((), callback); + activate(); + sub + } + // PERF: opt #[tracing::instrument(skip_all)] pub fn import_batch(&self, bytes: &[Vec]) -> LoroResult<()> { @@ -1457,7 +1466,10 @@ impl LoroDoc { self.commit_then_stop(); let ans = match mode { ExportMode::Snapshot => export_fast_snapshot(self), - ExportMode::Updates(vv) => export_fast_updates(self, vv), + ExportMode::Updates { from: vv } => export_fast_updates(self, vv), + ExportMode::UpdatesInRange { spans } => { + export_fast_updates_in_range(&self.oplog.lock().unwrap(), spans) + } ExportMode::GcSnapshot(f) => export_gc_snapshot(self, f), }; diff --git a/crates/loro-internal/src/obs.rs b/crates/loro-internal/src/obs.rs index d6c5b307..391bf4b8 100644 --- a/crates/loro-internal/src/obs.rs +++ b/crates/loro-internal/src/obs.rs @@ -15,6 +15,7 @@ use super::{ event::{DiffEvent, DocDiff}, }; +pub type LocalUpdateCallback = Box; pub type Subscriber = Arc Fn(DiffEvent<'a>)) + Send + Sync>; #[derive(Default)] diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index d73fd52e..df1dd43e 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -371,6 +371,11 @@ impl OpLog { .export_from(vv, self.vv(), self.frontiers()) } + #[inline(always)] + pub(crate) fn export_from_fast_in_range(&self, spans: &[IdSpan]) -> Bytes { + self.change_store.export_from_fast_in_range(spans) + } + #[inline(always)] pub(crate) fn decode(&mut self, data: ParsedHeaderAndBody) -> Result<(), LoroError> { decode_oplog(self, data) diff --git a/crates/loro-internal/src/oplog/change_store.rs b/crates/loro-internal/src/oplog/change_store.rs index 88a93d2b..4a36ab63 100644 --- a/crates/loro-internal/src/oplog/change_store.rs +++ b/crates/loro-internal/src/oplog/change_store.rs @@ -176,6 +176,33 @@ impl ChangeStore { new_store.encode_from(start_vv, &start_frontiers, latest_vv, latest_frontiers) } + pub(super) fn export_from_fast_in_range(&self, spans: &[IdSpan]) -> Bytes { + let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone()); + let latest_vv: VersionVector = spans.iter().map(|x| x.id_last()).collect(); + let start_vv: VersionVector = spans.iter().map(|x| x.id_start()).collect(); + for span in spans { + let mut span = *span; + span.normalize_(); + // PERF: this can be optimized by reusing the current encoded blocks + // In the current method, it needs to parse and re-encode the blocks + for c in self.iter_changes(span) { + let start = ((span.counter.start - c.id.counter).max(0) as usize).min(c.atom_len()); + let end = ((span.counter.end - c.id.counter).max(0) as usize).min(c.atom_len()); + + assert_ne!(start, end); + let ch = c.slice(start, end); + new_store.insert_change(ch, false); + } + } + + new_store.encode_from( + &start_vv, + &Default::default(), + &latest_vv, + &Default::default(), + ) + } + fn encode_from( &self, start_vv: &VersionVector, diff --git a/crates/loro-internal/src/txn.rs b/crates/loro-internal/src/txn.rs index 5b387ad1..5caa7926 100644 --- a/crates/loro-internal/src/txn.rs +++ b/crates/loro-internal/src/txn.rs @@ -7,7 +7,7 @@ use std::{ use enum_as_inner::EnumAsInner; use generic_btree::rle::{HasLength as RleHasLength, Mergeable as GBSliceable}; -use loro_common::{ContainerType, IdLp, LoroResult}; +use loro_common::{ContainerType, IdLp, IdSpan, LoroResult}; use loro_delta::{array_vec::ArrayVec, DeltaRopeBuilder}; use rle::{HasLength, Mergable, RleVec}; use smallvec::{smallvec, SmallVec}; @@ -38,7 +38,8 @@ use super::{ state::DocState, }; -pub type OnCommitFn = Box>) + Sync + Send>; +pub(crate) type OnCommitFn = + Box>, &Arc>, IdSpan) + Sync + Send>; pub struct Transaction { global_txn: Weak>>, @@ -371,7 +372,7 @@ impl Transaction { drop(state); drop(oplog); if let Some(on_commit) = self.on_commit.take() { - on_commit(&self.state); + on_commit(&self.state, &self.oplog, self.id_span()); } Ok(()) } @@ -503,6 +504,11 @@ impl Transaction { } } + #[inline] + pub fn id_span(&self) -> IdSpan { + IdSpan::new(self.peer, self.start_counter, self.next_counter) + } + pub fn next_idlp(&self) -> IdLp { IdLp { peer: self.peer, diff --git a/crates/loro-internal/src/utils/mod.rs b/crates/loro-internal/src/utils/mod.rs index f8101eff..9feef9be 100644 --- a/crates/loro-internal/src/utils/mod.rs +++ b/crates/loro-internal/src/utils/mod.rs @@ -2,4 +2,5 @@ pub(crate) mod kv_wrapper; pub(crate) mod lazy; pub(crate) mod query_by_len; pub mod string_slice; +pub(crate) mod subscription; pub(crate) mod utf16; diff --git a/crates/loro-internal/src/utils/subscription.rs b/crates/loro-internal/src/utils/subscription.rs new file mode 100644 index 00000000..a4c2006f --- /dev/null +++ b/crates/loro-internal/src/utils/subscription.rs @@ -0,0 +1,427 @@ +/* +This file is modified from the original file in the following repo: +https://github.com/zed-industries/zed + + +Copyright 2022 - 2024 Zed Industries, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + + http://www.apache.org/licenses/LICENSE-2.0 + + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + + + +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + + 1. Definitions. + + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + + END OF TERMS AND CONDITIONS + +*/ +use std::collections::{BTreeMap, BTreeSet}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Mutex; +use std::{fmt::Debug, mem, sync::Arc}; + +pub(crate) struct SubscriberSet( + Arc>>, +); + +impl Clone for SubscriberSet { + fn clone(&self) -> Self { + SubscriberSet(self.0.clone()) + } +} + +struct SubscriberSetState { + subscribers: BTreeMap>>>, + dropped_subscribers: BTreeSet<(EmitterKey, usize)>, + next_subscriber_id: usize, +} + +struct Subscriber { + active: Arc, + callback: Callback, +} + +impl SubscriberSet +where + EmitterKey: 'static + Ord + Clone + Debug, + Callback: 'static, +{ + pub fn new() -> Self { + Self(Arc::new(Mutex::new(SubscriberSetState { + subscribers: Default::default(), + dropped_subscribers: Default::default(), + next_subscriber_id: 0, + }))) + } + + /// Inserts a new [`Subscription`] for the given `emitter_key`. By default, subscriptions + /// are inert, meaning that they won't be listed when calling `[SubscriberSet::remove]` or `[SubscriberSet::retain]`. + /// This method returns a tuple of a [`Subscription`] and an `impl FnOnce`, and you can use the latter + /// to activate the [`Subscription`]. + pub fn insert( + &self, + emitter_key: EmitterKey, + callback: Callback, + ) -> (Subscription, impl FnOnce()) { + let active = Arc::new(AtomicBool::new(false)); + let mut lock = self.0.lock().unwrap(); + let subscriber_id = post_inc(&mut lock.next_subscriber_id); + lock.subscribers + .entry(emitter_key.clone()) + .or_default() + .get_or_insert_with(Default::default) + .insert( + subscriber_id, + Subscriber { + active: active.clone(), + callback, + }, + ); + let this = self.0.clone(); + + let subscription = Subscription { + unsubscribe: Some(Box::new(move || { + let mut lock = this.lock().unwrap(); + let Some(subscribers) = lock.subscribers.get_mut(&emitter_key) else { + // remove was called with this emitter_key + return; + }; + + if let Some(subscribers) = subscribers { + subscribers.remove(&subscriber_id); + if subscribers.is_empty() { + lock.subscribers.remove(&emitter_key); + } + return; + } + + // We didn't manage to remove the subscription, which means it was dropped + // while invoking the callback. Mark it as dropped so that we can remove it + // later. + lock.dropped_subscribers + .insert((emitter_key, subscriber_id)); + })), + }; + (subscription, move || active.store(true, Ordering::Relaxed)) + } + + pub fn remove(&self, emitter: &EmitterKey) -> impl IntoIterator { + let mut lock = self.0.lock().unwrap(); + let subscribers = lock.subscribers.remove(emitter); + subscribers + .unwrap_or_default() + .map(|s| s.into_values()) + .into_iter() + .flatten() + .filter_map(|subscriber| { + if subscriber.active.load(Ordering::Relaxed) { + Some(subscriber.callback) + } else { + None + } + }) + } + + /// Call the given callback for each subscriber to the given emitter. + /// If the callback returns false, the subscriber is removed. + pub fn retain(&self, emitter: &EmitterKey, mut f: F) + where + F: FnMut(&mut Callback) -> bool, + { + let Some(mut subscribers) = self + .0 + .lock() + .unwrap() + .subscribers + .get_mut(emitter) + .and_then(|s| s.take()) + else { + return; + }; + + subscribers.retain(|_, subscriber| { + if subscriber.active.load(Ordering::Relaxed) { + f(&mut subscriber.callback) + } else { + true + } + }); + let mut lock = self.0.lock().unwrap(); + + // Add any new subscribers that were added while invoking the callback. + if let Some(Some(new_subscribers)) = lock.subscribers.remove(emitter) { + subscribers.extend(new_subscribers); + } + + // Remove any dropped subscriptions that were dropped while invoking the callback. + for (dropped_emitter, dropped_subscription_id) in mem::take(&mut lock.dropped_subscribers) { + debug_assert_eq!(*emitter, dropped_emitter); + subscribers.remove(&dropped_subscription_id); + } + + if !subscribers.is_empty() { + lock.subscribers.insert(emitter.clone(), Some(subscribers)); + } + } + + pub fn is_empty(&self) -> bool { + self.0 + .lock() + .unwrap() + .subscribers + .iter() + .all(|x| match &x.1 { + Some(x) => x.is_empty(), + None => true, + }) + } +} + +fn post_inc(next_subscriber_id: &mut usize) -> usize { + let ans = *next_subscriber_id; + *next_subscriber_id += 1; + ans +} + +/// A handle to a subscription created by GPUI. When dropped, the subscription +/// is cancelled and the callback will no longer be invoked. +#[must_use] +pub struct Subscription { + unsubscribe: Option>, +} + +impl Subscription { + /// Creates a new subscription with a callback that gets invoked when + /// this subscription is dropped. + pub fn new(unsubscribe: impl 'static + FnOnce()) -> Self { + Self { + unsubscribe: Some(Box::new(unsubscribe)), + } + } + + /// Detaches the subscription from this handle. The callback will + /// continue to be invoked until the views or models it has been + /// subscribed to are dropped + pub fn detach(mut self) { + self.unsubscribe.take(); + } +} + +impl Drop for Subscription { + fn drop(&mut self) { + if let Some(unsubscribe) = self.unsubscribe.take() { + unsubscribe(); + } + } +} diff --git a/crates/loro-wasm/src/lib.rs b/crates/loro-wasm/src/lib.rs index e2e7fc53..efbbbef7 100644 --- a/crates/loro-wasm/src/lib.rs +++ b/crates/loro-wasm/src/lib.rs @@ -1120,6 +1120,25 @@ impl Loro { self.0.unsubscribe(SubID::from_u32(subscription)) } + /// Subscribe the updates from local edits + #[wasm_bindgen(js_name = "subscribeLocalUpdates", skip_typescript)] + pub fn subscribe_local_updates(&self, f: js_sys::Function) -> JsValue { + let observer = observer::Observer::new(f); + let mut sub = Some(self.0.subscribe_local_update(Box::new(move |e| { + let arr = js_sys::Uint8Array::new_with_length(e.len() as u32); + arr.copy_from(e); + if let Err(e) = observer.call1(&arr.into()) { + console_error!("Error: {:?}", e); + } + }))); + + let closure = Closure::wrap(Box::new(move || { + drop(sub.take()); + }) as Box); + + closure.into_js_value() + } + /// Debug the size of the history #[wasm_bindgen(js_name = "debugHistory")] pub fn debug_history(&self) { @@ -4121,6 +4140,51 @@ interface Loro { * ``` */ getContainerById(id: ContainerID): Container; + + /** + * Subscribe to updates from local edits. + * + * This method allows you to listen for local changes made to the document. + * It's useful for syncing changes with other instances or saving updates. + * + * @param f - A callback function that receives a Uint8Array containing the update data. + * @returns A function to unsubscribe from the updates. + * + * @example + * ```ts + * const loro = new Loro(); + * const text = loro.getText("text"); + * + * const unsubscribe = loro.subscribeLocalUpdates((update) => { + * console.log("Local update received:", update); + * // You can send this update to other Loro instances + * }); + * + * text.insert(0, "Hello"); + * loro.commit(); + * + * // Later, when you want to stop listening: + * unsubscribe(); + * ``` + * + * @example + * ```ts + * const loro1 = new Loro(); + * const loro2 = new Loro(); + * + * // Set up two-way sync + * loro1.subscribeLocalUpdates((updates) => { + * loro2.import(updates); + * }); + * + * loro2.subscribeLocalUpdates((updates) => { + * loro1.import(updates); + * }); + * + * // Now changes in loro1 will be reflected in loro2 and vice versa + * ``` + */ + subscribeLocalUpdates(f: (bytes: Uint8Array) => void): () => void } /** diff --git a/crates/loro/src/lib.rs b/crates/loro/src/lib.rs index 1c3039fb..a69ab6eb 100644 --- a/crates/loro/src/lib.rs +++ b/crates/loro/src/lib.rs @@ -13,6 +13,7 @@ use loro_internal::encoding::ImportBlobMetadata; use loro_internal::handler::HandlerTrait; use loro_internal::handler::ValueOrHandler; use loro_internal::json::JsonChange; +use loro_internal::obs::LocalUpdateCallback; use loro_internal::undo::{OnPop, OnPush}; use loro_internal::version::ImVersionVector; use loro_internal::DocState; @@ -51,6 +52,7 @@ pub use loro_internal::oplog::FrontiersNotIncluded; pub use loro_internal::undo; pub use loro_internal::version::{Frontiers, VersionVector}; pub use loro_internal::ApplyDiff; +pub use loro_internal::Subscription; pub use loro_internal::UndoManager as InnerUndoManager; pub use loro_internal::{loro_value, to_value}; pub use loro_internal::{LoroError, LoroResult, LoroValue, ToJson}; @@ -577,6 +579,11 @@ impl LoroDoc { self.doc.unsubscribe(id) } + /// Subscribe the local update of the document. + pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription { + self.doc.subscribe_local_update(callback) + } + /// Estimate the size of the document states in memory. #[inline] pub fn log_estimate_size(&self) { @@ -668,6 +675,11 @@ impl LoroDoc { self.doc.export_fast_snapshot() } + /// Export the document in the given mode. + pub fn export(&self, mode: ExportMode) -> Vec { + self.doc.export(mode) + } + /// Analyze the container info of the doc /// /// This is used for development and debugging. It can be slow. @@ -679,11 +691,6 @@ impl LoroDoc { pub fn get_path_to_container(&self, id: &ContainerID) -> Option> { self.doc.get_path_to_container(id) } - - /// Export the document in the given mode. - pub fn export(&self, mode: ExportMode) -> Vec { - self.doc.export(mode) - } } /// It's used to prevent the user from implementing the trait directly. diff --git a/crates/loro/tests/loro_rust_test.rs b/crates/loro/tests/loro_rust_test.rs index 4c5463c6..9cd8a487 100644 --- a/crates/loro/tests/loro_rust_test.rs +++ b/crates/loro/tests/loro_rust_test.rs @@ -10,7 +10,7 @@ use loro::{ use loro_internal::{handler::TextDelta, id::ID, vv, LoroResult}; use rand::{Rng, SeedableRng}; use serde_json::json; -use tracing::{trace, trace_span}; +use tracing::{info, trace, trace_span}; mod integration_test; @@ -933,7 +933,9 @@ fn new_update_encode_mode() { let doc2 = LoroDoc::new(); // Export updates from doc and import to doc2 - let updates = doc.export(loro::ExportMode::Updates(&Default::default())); + let updates = doc.export(loro::ExportMode::Updates { + from: &Default::default(), + }); doc2.import(&updates).unwrap(); // Check equality @@ -951,7 +953,9 @@ fn new_update_encode_mode() { doc2.commit(); // Export updates from doc2 and import to doc - let updates2 = doc2.export(loro::ExportMode::Updates(&doc.oplog_vv())); + let updates2 = doc2.export(loro::ExportMode::Updates { + from: &doc.oplog_vv(), + }); doc.import(&updates2).unwrap(); // Check equality after syncing back @@ -1029,12 +1033,16 @@ fn test_gc_sync() { assert_eq!(trim_end, 10); apply_random_ops(&new_doc, 1234, 5); - let updates = new_doc.export(loro::ExportMode::Updates(&doc.oplog_vv())); + let updates = new_doc.export(loro::ExportMode::Updates { + from: &doc.oplog_vv(), + }); doc.import(&updates).unwrap(); assert_eq!(doc.get_deep_value(), new_doc.get_deep_value()); apply_random_ops(&doc, 11, 5); - let updates = doc.export(loro::ExportMode::Updates(&new_doc.oplog_vv())); + let updates = doc.export(loro::ExportMode::Updates { + from: &new_doc.oplog_vv(), + }); new_doc.import(&updates).unwrap(); assert_eq!(doc.get_deep_value(), new_doc.get_deep_value()); } @@ -1211,6 +1219,49 @@ fn test_map_checkout_on_trimmed_doc() { } #[test] +fn test_loro_export_local_updates() { + use std::sync::{Arc, Mutex}; + + let doc = LoroDoc::new(); + let text = doc.get_text("text"); + let updates = Arc::new(Mutex::new(Vec::new())); + + let updates_clone = updates.clone(); + let subscription = doc.subscribe_local_update(Box::new(move |bytes: &[u8]| { + updates_clone.lock().unwrap().push(bytes.to_vec()); + })); + + // Make some changes + text.insert(0, "Hello").unwrap(); + doc.commit(); + text.insert(5, " world").unwrap(); + doc.commit(); + + // Check that updates were recorded + { + let recorded_updates = updates.lock().unwrap(); + assert_eq!(recorded_updates.len(), 2); + + // Verify the content of the updates + let doc_b = LoroDoc::new(); + doc_b.import(&recorded_updates[0]).unwrap(); + assert_eq!(doc_b.get_text("text").to_string(), "Hello"); + + doc_b.import(&recorded_updates[1]).unwrap(); + assert_eq!(doc_b.get_text("text").to_string(), "Hello world"); + } + + { + // Test that the subscription can be dropped + drop(subscription); + // Make another change + text.insert(11, "!").unwrap(); + doc.commit(); + // Check that no new update was recorded + assert_eq!(updates.lock().unwrap().len(), 2); + } +} + fn test_movable_list_checkout_on_trimmed_doc() -> LoroResult<()> { let doc = LoroDoc::new(); let list = doc.get_movable_list("list"); diff --git a/loro-js/tests/event.test.ts b/loro-js/tests/event.test.ts index 65d74aec..49e059e0 100644 --- a/loro-js/tests/event.test.ts +++ b/loro-js/tests/event.test.ts @@ -372,6 +372,124 @@ describe("event", () => { await oneMs(); expect(triggered).toBeTruthy(); }); + + describe("local updates events", () => { + it("basic", () => { + const loro = new Loro(); + const text = loro.getText("text"); + let updateReceived = false; + + const unsubscribe = loro.subscribeLocalUpdates((update) => { + updateReceived = true; + expect(update).toBeInstanceOf(Uint8Array); + expect(update.length).toBeGreaterThan(0); + }); + + text.insert(0, "Hello"); + loro.commit(); + + expect(updateReceived).toBe(true); + + // Test unsubscribe + updateReceived = false; + unsubscribe(); + + text.insert(5, " World"); + loro.commit(); + + expect(updateReceived).toBe(false); + }); + + it("multiple subscribers", () => { + const loro = new Loro(); + const text = loro.getText("text"); + let count1 = 0; + let count2 = 0; + + const unsubscribe1 = loro.subscribeLocalUpdates(() => { + count1++; + }); + + const unsubscribe2 = loro.subscribeLocalUpdates(() => { + count2++; + }); + + text.insert(0, "Hello"); + loro.commit(); + + expect(count1).toBe(1); + expect(count2).toBe(1); + + unsubscribe1(); + + text.insert(5, " World"); + loro.commit(); + + expect(count1).toBe(1); + expect(count2).toBe(2); + + unsubscribe2(); + }); + + it("updates for different containers", () => { + const loro = new Loro(); + const text = loro.getText("text"); + const list = loro.getList("list"); + const map = loro.getMap("map"); + let updates = 0; + + loro.subscribeLocalUpdates(() => { + updates++; + }); + + text.insert(0, "Hello"); + list.push("World"); + map.set("key", "value"); + loro.commit(); + + expect(updates).toBe(1); // All changes are bundled in one update + + text.insert(5, "!"); + loro.commit(); + + expect(updates).toBe(2); + }) + + it("can be used to sync", () => { + const loro1 = new Loro(); + const loro2 = new Loro(); + const text1 = loro1.getText("text"); + const text2 = loro2.getText("text"); + + loro1.subscribeLocalUpdates((updates) => { + loro2.import(updates); + }); + + loro2.subscribeLocalUpdates((updates) => { + loro1.import(updates); + }); + + text1.insert(0, "Hello"); + loro1.commit(); + + expect(text2.toString()).toBe("Hello"); + + text2.insert(5, " World"); + loro2.commit(); + + expect(text1.toString()).toBe("Hello World"); + + // Test concurrent edits + text1.insert(0, "1. "); + text2.insert(text2.length, "!"); + loro1.commit(); + loro2.commit(); + + // Both documents should converge to the same state + expect(text1.toString()).toBe("1. Hello World!"); + expect(text2.toString()).toBe("1. Hello World!"); + }) + }) }); function oneMs(): Promise {