mirror of
https://github.com/loro-dev/loro.git
synced 2025-02-05 20:17:13 +00:00
feat: subscribe for local updates (#444)
This commit is contained in:
parent
c1f0a40f4b
commit
08d53cae93
15 changed files with 797 additions and 40 deletions
3
.vscode/settings.json
vendored
3
.vscode/settings.json
vendored
|
@ -10,6 +10,7 @@
|
|||
"flate",
|
||||
"fuzzer",
|
||||
"gmax",
|
||||
"GPUI",
|
||||
"heapless",
|
||||
"Helloii",
|
||||
"Hoexllo",
|
||||
|
@ -50,7 +51,7 @@
|
|||
"DEBUG": "*"
|
||||
},
|
||||
"rust-analyzer.cargo.features": [
|
||||
"test_utils",
|
||||
// "test_utils",
|
||||
// "counter"
|
||||
],
|
||||
"editor.defaultFormatter": "rust-lang.rust-analyzer",
|
||||
|
|
|
@ -13,16 +13,18 @@ use crate::op::OpWithId;
|
|||
use crate::version::Frontiers;
|
||||
use crate::LoroDoc;
|
||||
use crate::{oplog::OpLog, LoroError, VersionVector};
|
||||
use loro_common::{IdLpSpan, LoroResult, PeerID};
|
||||
use loro_common::{IdLpSpan, IdSpan, LoroResult, PeerID};
|
||||
use num_traits::{FromPrimitive, ToPrimitive};
|
||||
use rle::{HasLength, Sliceable};
|
||||
use serde::{Deserialize, Serialize};
|
||||
pub(crate) use value::OwnedValue;
|
||||
|
||||
#[non_exhaustive]
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum ExportMode<'a> {
|
||||
Snapshot,
|
||||
Updates(&'a VersionVector),
|
||||
Updates { from: &'a VersionVector },
|
||||
UpdatesInRange { spans: &'a [IdSpan] },
|
||||
GcSnapshot(&'a Frontiers),
|
||||
}
|
||||
|
||||
|
@ -301,6 +303,24 @@ pub(crate) fn export_fast_updates(doc: &LoroDoc, vv: &VersionVector) -> Vec<u8>
|
|||
ans
|
||||
}
|
||||
|
||||
pub(crate) fn export_fast_updates_in_range(oplog: &OpLog, spans: &[IdSpan]) -> Vec<u8> {
|
||||
// HEADER
|
||||
let mut ans = Vec::with_capacity(MIN_HEADER_SIZE);
|
||||
ans.extend(MAGIC_BYTES);
|
||||
let checksum = [0; 16];
|
||||
ans.extend(checksum);
|
||||
ans.extend(EncodeMode::FastUpdates.to_bytes());
|
||||
|
||||
// BODY
|
||||
fast_snapshot::encode_updates_in_range(oplog, spans, &mut ans);
|
||||
|
||||
// CHECKSUM in HEADER
|
||||
let checksum_body = &ans[20..];
|
||||
let checksum = xxhash_rust::xxh32::xxh32(checksum_body, XXH_SEED);
|
||||
ans[16..20].copy_from_slice(&checksum.to_le_bytes());
|
||||
ans
|
||||
}
|
||||
|
||||
pub(crate) fn export_gc_snapshot(doc: &LoroDoc, f: &Frontiers) -> Vec<u8> {
|
||||
// HEADER
|
||||
let mut ans = Vec::with_capacity(MIN_HEADER_SIZE);
|
||||
|
|
|
@ -17,7 +17,7 @@ use std::io::{Read, Write};
|
|||
|
||||
use crate::{oplog::ChangeStore, LoroDoc, OpLog, VersionVector};
|
||||
use bytes::{Buf, Bytes};
|
||||
use loro_common::{LoroError, LoroResult};
|
||||
use loro_common::{IdSpan, LoroError, LoroResult};
|
||||
|
||||
use super::encode_reordered::import_changes_to_oplog;
|
||||
|
||||
|
@ -159,6 +159,22 @@ pub(crate) fn encode_updates<W: std::io::Write>(doc: &LoroDoc, vv: &VersionVecto
|
|||
);
|
||||
}
|
||||
|
||||
pub(crate) fn encode_updates_in_range<W: std::io::Write>(
|
||||
oplog: &OpLog,
|
||||
spans: &[IdSpan],
|
||||
w: &mut W,
|
||||
) {
|
||||
let bytes = oplog.export_from_fast_in_range(spans);
|
||||
_encode_snapshot(
|
||||
Snapshot {
|
||||
oplog_bytes: bytes,
|
||||
state_bytes: Bytes::new(),
|
||||
gc_bytes: Bytes::new(),
|
||||
},
|
||||
w,
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) fn decode_oplog(oplog: &mut OpLog, bytes: &[u8]) -> Result<(), LoroError> {
|
||||
let oplog_len = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
|
||||
let oplog_bytes = &bytes[4..4 + oplog_len as usize];
|
||||
|
|
|
@ -22,6 +22,7 @@ pub use loro_common;
|
|||
pub use oplog::OpLog;
|
||||
pub use state::DocState;
|
||||
pub use undo::UndoManager;
|
||||
pub use utils::subscription::Subscription;
|
||||
pub mod awareness;
|
||||
pub mod cursor;
|
||||
mod kv_store;
|
||||
|
|
|
@ -28,19 +28,20 @@ use crate::{
|
|||
dag::DagUtils,
|
||||
diff_calc::DiffCalculator,
|
||||
encoding::{
|
||||
decode_snapshot, export_fast_snapshot, export_fast_updates, export_gc_snapshot,
|
||||
export_snapshot, json_schema::json::JsonSchema, parse_header_and_body, EncodeMode,
|
||||
ParsedHeaderAndBody,
|
||||
decode_snapshot, export_fast_snapshot, export_fast_updates, export_fast_updates_in_range,
|
||||
export_gc_snapshot, export_snapshot, json_schema::json::JsonSchema, parse_header_and_body,
|
||||
EncodeMode, ParsedHeaderAndBody,
|
||||
},
|
||||
event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
|
||||
handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
|
||||
id::PeerID,
|
||||
obs::{Observer, SubID, Subscriber},
|
||||
obs::{LocalUpdateCallback, Observer, SubID, Subscriber},
|
||||
op::InnerContent,
|
||||
oplog::{loro_dag::FrontiersNotIncluded, OpLog},
|
||||
state::DocState,
|
||||
txn::Transaction,
|
||||
undo::DiffBatch,
|
||||
utils::subscription::{SubscriberSet, Subscription},
|
||||
version::{Frontiers, ImVersionVector},
|
||||
HandlerTrait, InternalString, ListHandler, LoroError, MapHandler, VersionVector,
|
||||
};
|
||||
|
@ -71,6 +72,7 @@ pub struct LoroDoc {
|
|||
arena: SharedArena,
|
||||
config: Configure,
|
||||
observer: Arc<Observer>,
|
||||
local_update_subs: SubscriberSet<(), LocalUpdateCallback>,
|
||||
diff_calculator: Arc<Mutex<DiffCalculator>>,
|
||||
// when dropping the doc, the txn will be committed
|
||||
txn: Arc<Mutex<Option<Transaction>>>,
|
||||
|
@ -108,6 +110,7 @@ impl LoroDoc {
|
|||
config,
|
||||
detached: AtomicBool::new(false),
|
||||
auto_commit: AtomicBool::new(false),
|
||||
local_update_subs: SubscriberSet::new(),
|
||||
observer: Arc::new(Observer::new(arena.clone())),
|
||||
diff_calculator: Arc::new(Mutex::new(DiffCalculator::new(true))),
|
||||
txn: global_txn,
|
||||
|
@ -136,6 +139,7 @@ impl LoroDoc {
|
|||
arena,
|
||||
config,
|
||||
observer: Arc::new(Observer::new(self.arena.clone())),
|
||||
local_update_subs: SubscriberSet::new(),
|
||||
diff_calculator: Arc::new(Mutex::new(DiffCalculator::new(true))),
|
||||
txn,
|
||||
auto_commit: AtomicBool::new(false),
|
||||
|
@ -222,22 +226,6 @@ impl LoroDoc {
|
|||
self.detached.load(Acquire)
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(super) fn from_existing(oplog: OpLog, state: DocState) -> Self {
|
||||
let obs = Observer::new(oplog.arena.clone());
|
||||
Self {
|
||||
arena: oplog.arena.clone(),
|
||||
observer: Arc::new(obs),
|
||||
config: Default::default(),
|
||||
auto_commit: AtomicBool::new(false),
|
||||
oplog: Arc::new(Mutex::new(oplog)),
|
||||
state: Arc::new(Mutex::new(state)),
|
||||
diff_calculator: Arc::new(Mutex::new(DiffCalculator::new(true))),
|
||||
txn: Arc::new(Mutex::new(None)),
|
||||
detached: AtomicBool::new(false),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn peer_id(&self) -> PeerID {
|
||||
self.state
|
||||
|
@ -377,6 +365,7 @@ impl LoroDoc {
|
|||
txn.set_msg(Some(msg.clone()));
|
||||
}
|
||||
|
||||
let id_span = txn.id_span();
|
||||
txn.commit().unwrap();
|
||||
if config.immediate_renew {
|
||||
let mut txn_guard = self.txn.try_lock().unwrap();
|
||||
|
@ -385,7 +374,7 @@ impl LoroDoc {
|
|||
}
|
||||
|
||||
if let Some(on_commit) = on_commit {
|
||||
on_commit(&self.state);
|
||||
on_commit(&self.state, &self.oplog, id_span);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -440,13 +429,27 @@ impl LoroDoc {
|
|||
);
|
||||
|
||||
let obs = self.observer.clone();
|
||||
txn.set_on_commit(Box::new(move |state| {
|
||||
let local_update_subs = self.local_update_subs.clone();
|
||||
txn.set_on_commit(Box::new(move |state, oplog, id_span| {
|
||||
let mut state = state.try_lock().unwrap();
|
||||
let events = state.take_events();
|
||||
drop(state);
|
||||
for event in events {
|
||||
obs.emit(event);
|
||||
}
|
||||
|
||||
if !local_update_subs.is_empty() {
|
||||
trace!("exporting fast updates");
|
||||
let bytes =
|
||||
{ export_fast_updates_in_range(&oplog.try_lock().unwrap(), &[id_span]) };
|
||||
trace!("exported fast updates");
|
||||
local_update_subs.retain(&(), |callback| {
|
||||
trace!("calling callback");
|
||||
callback(&bytes);
|
||||
trace!("called callback");
|
||||
true
|
||||
});
|
||||
}
|
||||
}));
|
||||
|
||||
Ok(txn)
|
||||
|
@ -1057,6 +1060,12 @@ impl LoroDoc {
|
|||
self.observer.unsubscribe(id);
|
||||
}
|
||||
|
||||
pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
|
||||
let (sub, activate) = self.local_update_subs.insert((), callback);
|
||||
activate();
|
||||
sub
|
||||
}
|
||||
|
||||
// PERF: opt
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<()> {
|
||||
|
@ -1457,7 +1466,10 @@ impl LoroDoc {
|
|||
self.commit_then_stop();
|
||||
let ans = match mode {
|
||||
ExportMode::Snapshot => export_fast_snapshot(self),
|
||||
ExportMode::Updates(vv) => export_fast_updates(self, vv),
|
||||
ExportMode::Updates { from: vv } => export_fast_updates(self, vv),
|
||||
ExportMode::UpdatesInRange { spans } => {
|
||||
export_fast_updates_in_range(&self.oplog.lock().unwrap(), spans)
|
||||
}
|
||||
ExportMode::GcSnapshot(f) => export_gc_snapshot(self, f),
|
||||
};
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@ use super::{
|
|||
event::{DiffEvent, DocDiff},
|
||||
};
|
||||
|
||||
pub type LocalUpdateCallback = Box<dyn Fn(&[u8]) + Send + Sync + 'static>;
|
||||
pub type Subscriber = Arc<dyn (for<'a> Fn(DiffEvent<'a>)) + Send + Sync>;
|
||||
|
||||
#[derive(Default)]
|
||||
|
|
|
@ -371,6 +371,11 @@ impl OpLog {
|
|||
.export_from(vv, self.vv(), self.frontiers())
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub(crate) fn export_from_fast_in_range(&self, spans: &[IdSpan]) -> Bytes {
|
||||
self.change_store.export_from_fast_in_range(spans)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub(crate) fn decode(&mut self, data: ParsedHeaderAndBody) -> Result<(), LoroError> {
|
||||
decode_oplog(self, data)
|
||||
|
|
|
@ -176,6 +176,33 @@ impl ChangeStore {
|
|||
new_store.encode_from(start_vv, &start_frontiers, latest_vv, latest_frontiers)
|
||||
}
|
||||
|
||||
pub(super) fn export_from_fast_in_range(&self, spans: &[IdSpan]) -> Bytes {
|
||||
let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
|
||||
let latest_vv: VersionVector = spans.iter().map(|x| x.id_last()).collect();
|
||||
let start_vv: VersionVector = spans.iter().map(|x| x.id_start()).collect();
|
||||
for span in spans {
|
||||
let mut span = *span;
|
||||
span.normalize_();
|
||||
// PERF: this can be optimized by reusing the current encoded blocks
|
||||
// In the current method, it needs to parse and re-encode the blocks
|
||||
for c in self.iter_changes(span) {
|
||||
let start = ((span.counter.start - c.id.counter).max(0) as usize).min(c.atom_len());
|
||||
let end = ((span.counter.end - c.id.counter).max(0) as usize).min(c.atom_len());
|
||||
|
||||
assert_ne!(start, end);
|
||||
let ch = c.slice(start, end);
|
||||
new_store.insert_change(ch, false);
|
||||
}
|
||||
}
|
||||
|
||||
new_store.encode_from(
|
||||
&start_vv,
|
||||
&Default::default(),
|
||||
&latest_vv,
|
||||
&Default::default(),
|
||||
)
|
||||
}
|
||||
|
||||
fn encode_from(
|
||||
&self,
|
||||
start_vv: &VersionVector,
|
||||
|
|
|
@ -7,7 +7,7 @@ use std::{
|
|||
|
||||
use enum_as_inner::EnumAsInner;
|
||||
use generic_btree::rle::{HasLength as RleHasLength, Mergeable as GBSliceable};
|
||||
use loro_common::{ContainerType, IdLp, LoroResult};
|
||||
use loro_common::{ContainerType, IdLp, IdSpan, LoroResult};
|
||||
use loro_delta::{array_vec::ArrayVec, DeltaRopeBuilder};
|
||||
use rle::{HasLength, Mergable, RleVec};
|
||||
use smallvec::{smallvec, SmallVec};
|
||||
|
@ -38,7 +38,8 @@ use super::{
|
|||
state::DocState,
|
||||
};
|
||||
|
||||
pub type OnCommitFn = Box<dyn FnOnce(&Arc<Mutex<DocState>>) + Sync + Send>;
|
||||
pub(crate) type OnCommitFn =
|
||||
Box<dyn FnOnce(&Arc<Mutex<DocState>>, &Arc<Mutex<OpLog>>, IdSpan) + Sync + Send>;
|
||||
|
||||
pub struct Transaction {
|
||||
global_txn: Weak<Mutex<Option<Transaction>>>,
|
||||
|
@ -371,7 +372,7 @@ impl Transaction {
|
|||
drop(state);
|
||||
drop(oplog);
|
||||
if let Some(on_commit) = self.on_commit.take() {
|
||||
on_commit(&self.state);
|
||||
on_commit(&self.state, &self.oplog, self.id_span());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
@ -503,6 +504,11 @@ impl Transaction {
|
|||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn id_span(&self) -> IdSpan {
|
||||
IdSpan::new(self.peer, self.start_counter, self.next_counter)
|
||||
}
|
||||
|
||||
pub fn next_idlp(&self) -> IdLp {
|
||||
IdLp {
|
||||
peer: self.peer,
|
||||
|
|
|
@ -2,4 +2,5 @@ pub(crate) mod kv_wrapper;
|
|||
pub(crate) mod lazy;
|
||||
pub(crate) mod query_by_len;
|
||||
pub mod string_slice;
|
||||
pub(crate) mod subscription;
|
||||
pub(crate) mod utf16;
|
||||
|
|
427
crates/loro-internal/src/utils/subscription.rs
Normal file
427
crates/loro-internal/src/utils/subscription.rs
Normal file
|
@ -0,0 +1,427 @@
|
|||
/*
|
||||
This file is modified from the original file in the following repo:
|
||||
https://github.com/zed-industries/zed
|
||||
|
||||
|
||||
Copyright 2022 - 2024 Zed Industries, Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
|
||||
|
||||
|
||||
Apache License
|
||||
Version 2.0, January 2004
|
||||
http://www.apache.org/licenses/
|
||||
|
||||
|
||||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
|
||||
|
||||
|
||||
1. Definitions.
|
||||
|
||||
|
||||
"License" shall mean the terms and conditions for use, reproduction,
|
||||
and distribution as defined by Sections 1 through 9 of this document.
|
||||
|
||||
|
||||
"Licensor" shall mean the copyright owner or entity authorized by
|
||||
the copyright owner that is granting the License.
|
||||
|
||||
|
||||
"Legal Entity" shall mean the union of the acting entity and all
|
||||
other entities that control, are controlled by, or are under common
|
||||
control with that entity. For the purposes of this definition,
|
||||
"control" means (i) the power, direct or indirect, to cause the
|
||||
direction or management of such entity, whether by contract or
|
||||
otherwise, or (ii) ownership of fifty percent (50%) or more of the
|
||||
outstanding shares, or (iii) beneficial ownership of such entity.
|
||||
|
||||
|
||||
"You" (or "Your") shall mean an individual or Legal Entity
|
||||
exercising permissions granted by this License.
|
||||
|
||||
|
||||
"Source" form shall mean the preferred form for making modifications,
|
||||
including but not limited to software source code, documentation
|
||||
source, and configuration files.
|
||||
|
||||
|
||||
"Object" form shall mean any form resulting from mechanical
|
||||
transformation or translation of a Source form, including but
|
||||
not limited to compiled object code, generated documentation,
|
||||
and conversions to other media types.
|
||||
|
||||
|
||||
"Work" shall mean the work of authorship, whether in Source or
|
||||
Object form, made available under the License, as indicated by a
|
||||
copyright notice that is included in or attached to the work
|
||||
(an example is provided in the Appendix below).
|
||||
|
||||
|
||||
"Derivative Works" shall mean any work, whether in Source or Object
|
||||
form, that is based on (or derived from) the Work and for which the
|
||||
editorial revisions, annotations, elaborations, or other modifications
|
||||
represent, as a whole, an original work of authorship. For the purposes
|
||||
of this License, Derivative Works shall not include works that remain
|
||||
separable from, or merely link (or bind by name) to the interfaces of,
|
||||
the Work and Derivative Works thereof.
|
||||
|
||||
|
||||
"Contribution" shall mean any work of authorship, including
|
||||
the original version of the Work and any modifications or additions
|
||||
to that Work or Derivative Works thereof, that is intentionally
|
||||
submitted to Licensor for inclusion in the Work by the copyright owner
|
||||
or by an individual or Legal Entity authorized to submit on behalf of
|
||||
the copyright owner. For the purposes of this definition, "submitted"
|
||||
means any form of electronic, verbal, or written communication sent
|
||||
to the Licensor or its representatives, including but not limited to
|
||||
communication on electronic mailing lists, source code control systems,
|
||||
and issue tracking systems that are managed by, or on behalf of, the
|
||||
Licensor for the purpose of discussing and improving the Work, but
|
||||
excluding communication that is conspicuously marked or otherwise
|
||||
designated in writing by the copyright owner as "Not a Contribution."
|
||||
|
||||
|
||||
"Contributor" shall mean Licensor and any individual or Legal Entity
|
||||
on behalf of whom a Contribution has been received by Licensor and
|
||||
subsequently incorporated within the Work.
|
||||
|
||||
|
||||
2. Grant of Copyright License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
copyright license to reproduce, prepare Derivative Works of,
|
||||
publicly display, publicly perform, sublicense, and distribute the
|
||||
Work and such Derivative Works in Source or Object form.
|
||||
|
||||
|
||||
3. Grant of Patent License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
(except as stated in this section) patent license to make, have made,
|
||||
use, offer to sell, sell, import, and otherwise transfer the Work,
|
||||
where such license applies only to those patent claims licensable
|
||||
by such Contributor that are necessarily infringed by their
|
||||
Contribution(s) alone or by combination of their Contribution(s)
|
||||
with the Work to which such Contribution(s) was submitted. If You
|
||||
institute patent litigation against any entity (including a
|
||||
cross-claim or counterclaim in a lawsuit) alleging that the Work
|
||||
or a Contribution incorporated within the Work constitutes direct
|
||||
or contributory patent infringement, then any patent licenses
|
||||
granted to You under this License for that Work shall terminate
|
||||
as of the date such litigation is filed.
|
||||
|
||||
|
||||
4. Redistribution. You may reproduce and distribute copies of the
|
||||
Work or Derivative Works thereof in any medium, with or without
|
||||
modifications, and in Source or Object form, provided that You
|
||||
meet the following conditions:
|
||||
|
||||
|
||||
(a) You must give any other recipients of the Work or
|
||||
Derivative Works a copy of this License; and
|
||||
|
||||
|
||||
(b) You must cause any modified files to carry prominent notices
|
||||
stating that You changed the files; and
|
||||
|
||||
|
||||
(c) You must retain, in the Source form of any Derivative Works
|
||||
that You distribute, all copyright, patent, trademark, and
|
||||
attribution notices from the Source form of the Work,
|
||||
excluding those notices that do not pertain to any part of
|
||||
the Derivative Works; and
|
||||
|
||||
|
||||
(d) If the Work includes a "NOTICE" text file as part of its
|
||||
distribution, then any Derivative Works that You distribute must
|
||||
include a readable copy of the attribution notices contained
|
||||
within such NOTICE file, excluding those notices that do not
|
||||
pertain to any part of the Derivative Works, in at least one
|
||||
of the following places: within a NOTICE text file distributed
|
||||
as part of the Derivative Works; within the Source form or
|
||||
documentation, if provided along with the Derivative Works; or,
|
||||
within a display generated by the Derivative Works, if and
|
||||
wherever such third-party notices normally appear. The contents
|
||||
of the NOTICE file are for informational purposes only and
|
||||
do not modify the License. You may add Your own attribution
|
||||
notices within Derivative Works that You distribute, alongside
|
||||
or as an addendum to the NOTICE text from the Work, provided
|
||||
that such additional attribution notices cannot be construed
|
||||
as modifying the License.
|
||||
|
||||
|
||||
You may add Your own copyright statement to Your modifications and
|
||||
may provide additional or different license terms and conditions
|
||||
for use, reproduction, or distribution of Your modifications, or
|
||||
for any such Derivative Works as a whole, provided Your use,
|
||||
reproduction, and distribution of the Work otherwise complies with
|
||||
the conditions stated in this License.
|
||||
|
||||
|
||||
5. Submission of Contributions. Unless You explicitly state otherwise,
|
||||
any Contribution intentionally submitted for inclusion in the Work
|
||||
by You to the Licensor shall be under the terms and conditions of
|
||||
this License, without any additional terms or conditions.
|
||||
Notwithstanding the above, nothing herein shall supersede or modify
|
||||
the terms of any separate license agreement you may have executed
|
||||
with Licensor regarding such Contributions.
|
||||
|
||||
|
||||
6. Trademarks. This License does not grant permission to use the trade
|
||||
names, trademarks, service marks, or product names of the Licensor,
|
||||
except as required for reasonable and customary use in describing the
|
||||
origin of the Work and reproducing the content of the NOTICE file.
|
||||
|
||||
|
||||
7. Disclaimer of Warranty. Unless required by applicable law or
|
||||
agreed to in writing, Licensor provides the Work (and each
|
||||
Contributor provides its Contributions) on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied, including, without limitation, any warranties or conditions
|
||||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
|
||||
PARTICULAR PURPOSE. You are solely responsible for determining the
|
||||
appropriateness of using or redistributing the Work and assume any
|
||||
risks associated with Your exercise of permissions under this License.
|
||||
|
||||
|
||||
8. Limitation of Liability. In no event and under no legal theory,
|
||||
whether in tort (including negligence), contract, or otherwise,
|
||||
unless required by applicable law (such as deliberate and grossly
|
||||
negligent acts) or agreed to in writing, shall any Contributor be
|
||||
liable to You for damages, including any direct, indirect, special,
|
||||
incidental, or consequential damages of any character arising as a
|
||||
result of this License or out of the use or inability to use the
|
||||
Work (including but not limited to damages for loss of goodwill,
|
||||
work stoppage, computer failure or malfunction, or any and all
|
||||
other commercial damages or losses), even if such Contributor
|
||||
has been advised of the possibility of such damages.
|
||||
|
||||
|
||||
9. Accepting Warranty or Additional Liability. While redistributing
|
||||
the Work or Derivative Works thereof, You may choose to offer,
|
||||
and charge a fee for, acceptance of support, warranty, indemnity,
|
||||
or other liability obligations and/or rights consistent with this
|
||||
License. However, in accepting such obligations, You may act only
|
||||
on Your own behalf and on Your sole responsibility, not on behalf
|
||||
of any other Contributor, and only if You agree to indemnify,
|
||||
defend, and hold each Contributor harmless for any liability
|
||||
incurred by, or claims asserted against, such Contributor by reason
|
||||
of your accepting any such warranty or additional liability.
|
||||
|
||||
|
||||
END OF TERMS AND CONDITIONS
|
||||
|
||||
*/
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Mutex;
|
||||
use std::{fmt::Debug, mem, sync::Arc};
|
||||
|
||||
pub(crate) struct SubscriberSet<EmitterKey, Callback>(
|
||||
Arc<Mutex<SubscriberSetState<EmitterKey, Callback>>>,
|
||||
);
|
||||
|
||||
impl<EmitterKey, Callback> Clone for SubscriberSet<EmitterKey, Callback> {
|
||||
fn clone(&self) -> Self {
|
||||
SubscriberSet(self.0.clone())
|
||||
}
|
||||
}
|
||||
|
||||
struct SubscriberSetState<EmitterKey, Callback> {
|
||||
subscribers: BTreeMap<EmitterKey, Option<BTreeMap<usize, Subscriber<Callback>>>>,
|
||||
dropped_subscribers: BTreeSet<(EmitterKey, usize)>,
|
||||
next_subscriber_id: usize,
|
||||
}
|
||||
|
||||
struct Subscriber<Callback> {
|
||||
active: Arc<AtomicBool>,
|
||||
callback: Callback,
|
||||
}
|
||||
|
||||
impl<EmitterKey, Callback> SubscriberSet<EmitterKey, Callback>
|
||||
where
|
||||
EmitterKey: 'static + Ord + Clone + Debug,
|
||||
Callback: 'static,
|
||||
{
|
||||
pub fn new() -> Self {
|
||||
Self(Arc::new(Mutex::new(SubscriberSetState {
|
||||
subscribers: Default::default(),
|
||||
dropped_subscribers: Default::default(),
|
||||
next_subscriber_id: 0,
|
||||
})))
|
||||
}
|
||||
|
||||
/// Inserts a new [`Subscription`] for the given `emitter_key`. By default, subscriptions
|
||||
/// are inert, meaning that they won't be listed when calling `[SubscriberSet::remove]` or `[SubscriberSet::retain]`.
|
||||
/// This method returns a tuple of a [`Subscription`] and an `impl FnOnce`, and you can use the latter
|
||||
/// to activate the [`Subscription`].
|
||||
pub fn insert(
|
||||
&self,
|
||||
emitter_key: EmitterKey,
|
||||
callback: Callback,
|
||||
) -> (Subscription, impl FnOnce()) {
|
||||
let active = Arc::new(AtomicBool::new(false));
|
||||
let mut lock = self.0.lock().unwrap();
|
||||
let subscriber_id = post_inc(&mut lock.next_subscriber_id);
|
||||
lock.subscribers
|
||||
.entry(emitter_key.clone())
|
||||
.or_default()
|
||||
.get_or_insert_with(Default::default)
|
||||
.insert(
|
||||
subscriber_id,
|
||||
Subscriber {
|
||||
active: active.clone(),
|
||||
callback,
|
||||
},
|
||||
);
|
||||
let this = self.0.clone();
|
||||
|
||||
let subscription = Subscription {
|
||||
unsubscribe: Some(Box::new(move || {
|
||||
let mut lock = this.lock().unwrap();
|
||||
let Some(subscribers) = lock.subscribers.get_mut(&emitter_key) else {
|
||||
// remove was called with this emitter_key
|
||||
return;
|
||||
};
|
||||
|
||||
if let Some(subscribers) = subscribers {
|
||||
subscribers.remove(&subscriber_id);
|
||||
if subscribers.is_empty() {
|
||||
lock.subscribers.remove(&emitter_key);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// We didn't manage to remove the subscription, which means it was dropped
|
||||
// while invoking the callback. Mark it as dropped so that we can remove it
|
||||
// later.
|
||||
lock.dropped_subscribers
|
||||
.insert((emitter_key, subscriber_id));
|
||||
})),
|
||||
};
|
||||
(subscription, move || active.store(true, Ordering::Relaxed))
|
||||
}
|
||||
|
||||
pub fn remove(&self, emitter: &EmitterKey) -> impl IntoIterator<Item = Callback> {
|
||||
let mut lock = self.0.lock().unwrap();
|
||||
let subscribers = lock.subscribers.remove(emitter);
|
||||
subscribers
|
||||
.unwrap_or_default()
|
||||
.map(|s| s.into_values())
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.filter_map(|subscriber| {
|
||||
if subscriber.active.load(Ordering::Relaxed) {
|
||||
Some(subscriber.callback)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Call the given callback for each subscriber to the given emitter.
|
||||
/// If the callback returns false, the subscriber is removed.
|
||||
pub fn retain<F>(&self, emitter: &EmitterKey, mut f: F)
|
||||
where
|
||||
F: FnMut(&mut Callback) -> bool,
|
||||
{
|
||||
let Some(mut subscribers) = self
|
||||
.0
|
||||
.lock()
|
||||
.unwrap()
|
||||
.subscribers
|
||||
.get_mut(emitter)
|
||||
.and_then(|s| s.take())
|
||||
else {
|
||||
return;
|
||||
};
|
||||
|
||||
subscribers.retain(|_, subscriber| {
|
||||
if subscriber.active.load(Ordering::Relaxed) {
|
||||
f(&mut subscriber.callback)
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
let mut lock = self.0.lock().unwrap();
|
||||
|
||||
// Add any new subscribers that were added while invoking the callback.
|
||||
if let Some(Some(new_subscribers)) = lock.subscribers.remove(emitter) {
|
||||
subscribers.extend(new_subscribers);
|
||||
}
|
||||
|
||||
// Remove any dropped subscriptions that were dropped while invoking the callback.
|
||||
for (dropped_emitter, dropped_subscription_id) in mem::take(&mut lock.dropped_subscribers) {
|
||||
debug_assert_eq!(*emitter, dropped_emitter);
|
||||
subscribers.remove(&dropped_subscription_id);
|
||||
}
|
||||
|
||||
if !subscribers.is_empty() {
|
||||
lock.subscribers.insert(emitter.clone(), Some(subscribers));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.0
|
||||
.lock()
|
||||
.unwrap()
|
||||
.subscribers
|
||||
.iter()
|
||||
.all(|x| match &x.1 {
|
||||
Some(x) => x.is_empty(),
|
||||
None => true,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn post_inc(next_subscriber_id: &mut usize) -> usize {
|
||||
let ans = *next_subscriber_id;
|
||||
*next_subscriber_id += 1;
|
||||
ans
|
||||
}
|
||||
|
||||
/// A handle to a subscription created by GPUI. When dropped, the subscription
|
||||
/// is cancelled and the callback will no longer be invoked.
|
||||
#[must_use]
|
||||
pub struct Subscription {
|
||||
unsubscribe: Option<Box<dyn FnOnce() + 'static>>,
|
||||
}
|
||||
|
||||
impl Subscription {
|
||||
/// Creates a new subscription with a callback that gets invoked when
|
||||
/// this subscription is dropped.
|
||||
pub fn new(unsubscribe: impl 'static + FnOnce()) -> Self {
|
||||
Self {
|
||||
unsubscribe: Some(Box::new(unsubscribe)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Detaches the subscription from this handle. The callback will
|
||||
/// continue to be invoked until the views or models it has been
|
||||
/// subscribed to are dropped
|
||||
pub fn detach(mut self) {
|
||||
self.unsubscribe.take();
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Subscription {
|
||||
fn drop(&mut self) {
|
||||
if let Some(unsubscribe) = self.unsubscribe.take() {
|
||||
unsubscribe();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1120,6 +1120,25 @@ impl Loro {
|
|||
self.0.unsubscribe(SubID::from_u32(subscription))
|
||||
}
|
||||
|
||||
/// Subscribe the updates from local edits
|
||||
#[wasm_bindgen(js_name = "subscribeLocalUpdates", skip_typescript)]
|
||||
pub fn subscribe_local_updates(&self, f: js_sys::Function) -> JsValue {
|
||||
let observer = observer::Observer::new(f);
|
||||
let mut sub = Some(self.0.subscribe_local_update(Box::new(move |e| {
|
||||
let arr = js_sys::Uint8Array::new_with_length(e.len() as u32);
|
||||
arr.copy_from(e);
|
||||
if let Err(e) = observer.call1(&arr.into()) {
|
||||
console_error!("Error: {:?}", e);
|
||||
}
|
||||
})));
|
||||
|
||||
let closure = Closure::wrap(Box::new(move || {
|
||||
drop(sub.take());
|
||||
}) as Box<dyn FnMut()>);
|
||||
|
||||
closure.into_js_value()
|
||||
}
|
||||
|
||||
/// Debug the size of the history
|
||||
#[wasm_bindgen(js_name = "debugHistory")]
|
||||
pub fn debug_history(&self) {
|
||||
|
@ -4121,6 +4140,51 @@ interface Loro {
|
|||
* ```
|
||||
*/
|
||||
getContainerById(id: ContainerID): Container;
|
||||
|
||||
/**
|
||||
* Subscribe to updates from local edits.
|
||||
*
|
||||
* This method allows you to listen for local changes made to the document.
|
||||
* It's useful for syncing changes with other instances or saving updates.
|
||||
*
|
||||
* @param f - A callback function that receives a Uint8Array containing the update data.
|
||||
* @returns A function to unsubscribe from the updates.
|
||||
*
|
||||
* @example
|
||||
* ```ts
|
||||
* const loro = new Loro();
|
||||
* const text = loro.getText("text");
|
||||
*
|
||||
* const unsubscribe = loro.subscribeLocalUpdates((update) => {
|
||||
* console.log("Local update received:", update);
|
||||
* // You can send this update to other Loro instances
|
||||
* });
|
||||
*
|
||||
* text.insert(0, "Hello");
|
||||
* loro.commit();
|
||||
*
|
||||
* // Later, when you want to stop listening:
|
||||
* unsubscribe();
|
||||
* ```
|
||||
*
|
||||
* @example
|
||||
* ```ts
|
||||
* const loro1 = new Loro();
|
||||
* const loro2 = new Loro();
|
||||
*
|
||||
* // Set up two-way sync
|
||||
* loro1.subscribeLocalUpdates((updates) => {
|
||||
* loro2.import(updates);
|
||||
* });
|
||||
*
|
||||
* loro2.subscribeLocalUpdates((updates) => {
|
||||
* loro1.import(updates);
|
||||
* });
|
||||
*
|
||||
* // Now changes in loro1 will be reflected in loro2 and vice versa
|
||||
* ```
|
||||
*/
|
||||
subscribeLocalUpdates(f: (bytes: Uint8Array) => void): () => void
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -13,6 +13,7 @@ use loro_internal::encoding::ImportBlobMetadata;
|
|||
use loro_internal::handler::HandlerTrait;
|
||||
use loro_internal::handler::ValueOrHandler;
|
||||
use loro_internal::json::JsonChange;
|
||||
use loro_internal::obs::LocalUpdateCallback;
|
||||
use loro_internal::undo::{OnPop, OnPush};
|
||||
use loro_internal::version::ImVersionVector;
|
||||
use loro_internal::DocState;
|
||||
|
@ -51,6 +52,7 @@ pub use loro_internal::oplog::FrontiersNotIncluded;
|
|||
pub use loro_internal::undo;
|
||||
pub use loro_internal::version::{Frontiers, VersionVector};
|
||||
pub use loro_internal::ApplyDiff;
|
||||
pub use loro_internal::Subscription;
|
||||
pub use loro_internal::UndoManager as InnerUndoManager;
|
||||
pub use loro_internal::{loro_value, to_value};
|
||||
pub use loro_internal::{LoroError, LoroResult, LoroValue, ToJson};
|
||||
|
@ -577,6 +579,11 @@ impl LoroDoc {
|
|||
self.doc.unsubscribe(id)
|
||||
}
|
||||
|
||||
/// Subscribe the local update of the document.
|
||||
pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
|
||||
self.doc.subscribe_local_update(callback)
|
||||
}
|
||||
|
||||
/// Estimate the size of the document states in memory.
|
||||
#[inline]
|
||||
pub fn log_estimate_size(&self) {
|
||||
|
@ -668,6 +675,11 @@ impl LoroDoc {
|
|||
self.doc.export_fast_snapshot()
|
||||
}
|
||||
|
||||
/// Export the document in the given mode.
|
||||
pub fn export(&self, mode: ExportMode) -> Vec<u8> {
|
||||
self.doc.export(mode)
|
||||
}
|
||||
|
||||
/// Analyze the container info of the doc
|
||||
///
|
||||
/// This is used for development and debugging. It can be slow.
|
||||
|
@ -679,11 +691,6 @@ impl LoroDoc {
|
|||
pub fn get_path_to_container(&self, id: &ContainerID) -> Option<Vec<(ContainerID, Index)>> {
|
||||
self.doc.get_path_to_container(id)
|
||||
}
|
||||
|
||||
/// Export the document in the given mode.
|
||||
pub fn export(&self, mode: ExportMode) -> Vec<u8> {
|
||||
self.doc.export(mode)
|
||||
}
|
||||
}
|
||||
|
||||
/// It's used to prevent the user from implementing the trait directly.
|
||||
|
|
|
@ -10,7 +10,7 @@ use loro::{
|
|||
use loro_internal::{handler::TextDelta, id::ID, vv, LoroResult};
|
||||
use rand::{Rng, SeedableRng};
|
||||
use serde_json::json;
|
||||
use tracing::{trace, trace_span};
|
||||
use tracing::{info, trace, trace_span};
|
||||
|
||||
mod integration_test;
|
||||
|
||||
|
@ -933,7 +933,9 @@ fn new_update_encode_mode() {
|
|||
let doc2 = LoroDoc::new();
|
||||
|
||||
// Export updates from doc and import to doc2
|
||||
let updates = doc.export(loro::ExportMode::Updates(&Default::default()));
|
||||
let updates = doc.export(loro::ExportMode::Updates {
|
||||
from: &Default::default(),
|
||||
});
|
||||
doc2.import(&updates).unwrap();
|
||||
|
||||
// Check equality
|
||||
|
@ -951,7 +953,9 @@ fn new_update_encode_mode() {
|
|||
doc2.commit();
|
||||
|
||||
// Export updates from doc2 and import to doc
|
||||
let updates2 = doc2.export(loro::ExportMode::Updates(&doc.oplog_vv()));
|
||||
let updates2 = doc2.export(loro::ExportMode::Updates {
|
||||
from: &doc.oplog_vv(),
|
||||
});
|
||||
doc.import(&updates2).unwrap();
|
||||
|
||||
// Check equality after syncing back
|
||||
|
@ -1029,12 +1033,16 @@ fn test_gc_sync() {
|
|||
assert_eq!(trim_end, 10);
|
||||
|
||||
apply_random_ops(&new_doc, 1234, 5);
|
||||
let updates = new_doc.export(loro::ExportMode::Updates(&doc.oplog_vv()));
|
||||
let updates = new_doc.export(loro::ExportMode::Updates {
|
||||
from: &doc.oplog_vv(),
|
||||
});
|
||||
doc.import(&updates).unwrap();
|
||||
assert_eq!(doc.get_deep_value(), new_doc.get_deep_value());
|
||||
|
||||
apply_random_ops(&doc, 11, 5);
|
||||
let updates = doc.export(loro::ExportMode::Updates(&new_doc.oplog_vv()));
|
||||
let updates = doc.export(loro::ExportMode::Updates {
|
||||
from: &new_doc.oplog_vv(),
|
||||
});
|
||||
new_doc.import(&updates).unwrap();
|
||||
assert_eq!(doc.get_deep_value(), new_doc.get_deep_value());
|
||||
}
|
||||
|
@ -1211,6 +1219,49 @@ fn test_map_checkout_on_trimmed_doc() {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn test_loro_export_local_updates() {
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
let doc = LoroDoc::new();
|
||||
let text = doc.get_text("text");
|
||||
let updates = Arc::new(Mutex::new(Vec::new()));
|
||||
|
||||
let updates_clone = updates.clone();
|
||||
let subscription = doc.subscribe_local_update(Box::new(move |bytes: &[u8]| {
|
||||
updates_clone.lock().unwrap().push(bytes.to_vec());
|
||||
}));
|
||||
|
||||
// Make some changes
|
||||
text.insert(0, "Hello").unwrap();
|
||||
doc.commit();
|
||||
text.insert(5, " world").unwrap();
|
||||
doc.commit();
|
||||
|
||||
// Check that updates were recorded
|
||||
{
|
||||
let recorded_updates = updates.lock().unwrap();
|
||||
assert_eq!(recorded_updates.len(), 2);
|
||||
|
||||
// Verify the content of the updates
|
||||
let doc_b = LoroDoc::new();
|
||||
doc_b.import(&recorded_updates[0]).unwrap();
|
||||
assert_eq!(doc_b.get_text("text").to_string(), "Hello");
|
||||
|
||||
doc_b.import(&recorded_updates[1]).unwrap();
|
||||
assert_eq!(doc_b.get_text("text").to_string(), "Hello world");
|
||||
}
|
||||
|
||||
{
|
||||
// Test that the subscription can be dropped
|
||||
drop(subscription);
|
||||
// Make another change
|
||||
text.insert(11, "!").unwrap();
|
||||
doc.commit();
|
||||
// Check that no new update was recorded
|
||||
assert_eq!(updates.lock().unwrap().len(), 2);
|
||||
}
|
||||
}
|
||||
|
||||
fn test_movable_list_checkout_on_trimmed_doc() -> LoroResult<()> {
|
||||
let doc = LoroDoc::new();
|
||||
let list = doc.get_movable_list("list");
|
||||
|
|
|
@ -372,6 +372,124 @@ describe("event", () => {
|
|||
await oneMs();
|
||||
expect(triggered).toBeTruthy();
|
||||
});
|
||||
|
||||
describe("local updates events", () => {
|
||||
it("basic", () => {
|
||||
const loro = new Loro();
|
||||
const text = loro.getText("text");
|
||||
let updateReceived = false;
|
||||
|
||||
const unsubscribe = loro.subscribeLocalUpdates((update) => {
|
||||
updateReceived = true;
|
||||
expect(update).toBeInstanceOf(Uint8Array);
|
||||
expect(update.length).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
text.insert(0, "Hello");
|
||||
loro.commit();
|
||||
|
||||
expect(updateReceived).toBe(true);
|
||||
|
||||
// Test unsubscribe
|
||||
updateReceived = false;
|
||||
unsubscribe();
|
||||
|
||||
text.insert(5, " World");
|
||||
loro.commit();
|
||||
|
||||
expect(updateReceived).toBe(false);
|
||||
});
|
||||
|
||||
it("multiple subscribers", () => {
|
||||
const loro = new Loro();
|
||||
const text = loro.getText("text");
|
||||
let count1 = 0;
|
||||
let count2 = 0;
|
||||
|
||||
const unsubscribe1 = loro.subscribeLocalUpdates(() => {
|
||||
count1++;
|
||||
});
|
||||
|
||||
const unsubscribe2 = loro.subscribeLocalUpdates(() => {
|
||||
count2++;
|
||||
});
|
||||
|
||||
text.insert(0, "Hello");
|
||||
loro.commit();
|
||||
|
||||
expect(count1).toBe(1);
|
||||
expect(count2).toBe(1);
|
||||
|
||||
unsubscribe1();
|
||||
|
||||
text.insert(5, " World");
|
||||
loro.commit();
|
||||
|
||||
expect(count1).toBe(1);
|
||||
expect(count2).toBe(2);
|
||||
|
||||
unsubscribe2();
|
||||
});
|
||||
|
||||
it("updates for different containers", () => {
|
||||
const loro = new Loro();
|
||||
const text = loro.getText("text");
|
||||
const list = loro.getList("list");
|
||||
const map = loro.getMap("map");
|
||||
let updates = 0;
|
||||
|
||||
loro.subscribeLocalUpdates(() => {
|
||||
updates++;
|
||||
});
|
||||
|
||||
text.insert(0, "Hello");
|
||||
list.push("World");
|
||||
map.set("key", "value");
|
||||
loro.commit();
|
||||
|
||||
expect(updates).toBe(1); // All changes are bundled in one update
|
||||
|
||||
text.insert(5, "!");
|
||||
loro.commit();
|
||||
|
||||
expect(updates).toBe(2);
|
||||
})
|
||||
|
||||
it("can be used to sync", () => {
|
||||
const loro1 = new Loro();
|
||||
const loro2 = new Loro();
|
||||
const text1 = loro1.getText("text");
|
||||
const text2 = loro2.getText("text");
|
||||
|
||||
loro1.subscribeLocalUpdates((updates) => {
|
||||
loro2.import(updates);
|
||||
});
|
||||
|
||||
loro2.subscribeLocalUpdates((updates) => {
|
||||
loro1.import(updates);
|
||||
});
|
||||
|
||||
text1.insert(0, "Hello");
|
||||
loro1.commit();
|
||||
|
||||
expect(text2.toString()).toBe("Hello");
|
||||
|
||||
text2.insert(5, " World");
|
||||
loro2.commit();
|
||||
|
||||
expect(text1.toString()).toBe("Hello World");
|
||||
|
||||
// Test concurrent edits
|
||||
text1.insert(0, "1. ");
|
||||
text2.insert(text2.length, "!");
|
||||
loro1.commit();
|
||||
loro2.commit();
|
||||
|
||||
// Both documents should converge to the same state
|
||||
expect(text1.toString()).toBe("1. Hello World!");
|
||||
expect(text2.toString()).toBe("1. Hello World!");
|
||||
})
|
||||
})
|
||||
});
|
||||
|
||||
function oneMs(): Promise<void> {
|
||||
|
|
Loading…
Reference in a new issue