feat: subscribe for container events

This commit is contained in:
Zixuan Chen 2023-07-26 18:56:03 +08:00
parent 1ed8ad05be
commit 470d23a198
7 changed files with 168 additions and 25 deletions

View file

@ -2,9 +2,13 @@ use criterion::{criterion_group, criterion_main, Criterion};
#[cfg(feature = "test_utils")]
mod run {
use std::sync::{atomic::AtomicU32, Arc};
use super::*;
use bench_utils::TextAction;
use criterion::black_box;
use loro_internal::refactor::loro::LoroDoc;
use tracing::instrument::WithSubscriber;
pub fn b4(c: &mut Criterion) {
let actions = bench_utils::get_automerge_actions();
@ -23,6 +27,21 @@ mod run {
})
});
b.bench_function("B4 Obs", |b| {
b.iter(|| {
let loro = LoroDoc::default();
let text = loro.get_text("text");
loro.subscribe_deep(Arc::new(move |event| {
black_box(event);
}));
let mut txn = loro.txn().unwrap();
for TextAction { pos, ins, del } in actions.iter() {
text.delete(&mut txn, *pos, *del);
text.insert(&mut txn, *pos, ins);
}
})
});
b.bench_function("B4 encode snapshot", |b| {
let loro = LoroDoc::default();
let text = loro.get_text("text");
@ -165,6 +184,24 @@ mod run {
})
});
b.bench_function("B4 One Op One Txn Obs", |b| {
b.iter(|| {
let loro = LoroDoc::default();
let text = loro.get_text("text");
loro.subscribe_deep(Arc::new(move |event| {
black_box(event);
}));
{
for TextAction { pos, ins, del } in actions.iter() {
let mut txn = loro.txn().unwrap();
text.delete(&mut txn, *pos, *del);
text.insert(&mut txn, *pos, ins);
txn.commit().unwrap();
}
}
})
});
b.bench_function("B4DirectSync", |b| {
b.iter(|| {
let loro = LoroDoc::default();

View file

@ -7,6 +7,7 @@ use std::{
use arbitrary::Arbitrary;
use debug_log::debug_dbg;
use enum_as_inner::EnumAsInner;
use fxhash::FxHashMap;
use tabled::{TableIteratorExt, Tabled};
#[allow(unused_imports)]
@ -56,9 +57,9 @@ pub enum Action {
struct Actor {
loro: LoroDoc,
value_tracker: Arc<Mutex<LoroValue>>,
// map_tracker: Arc<Mutex<FxHashMap<String, LoroValue>>>,
// list_tracker: Arc<Mutex<Vec<LoroValue>>>,
// text_tracker: Arc<Mutex<String>>,
map_tracker: Arc<Mutex<FxHashMap<String, LoroValue>>>,
list_tracker: Arc<Mutex<Vec<LoroValue>>>,
text_tracker: Arc<Mutex<String>>,
map_containers: Vec<MapHandler>,
list_containers: Vec<ListHandler>,
text_containers: Vec<TextHandler>,
@ -71,9 +72,9 @@ impl Actor {
let mut actor = Actor {
loro: app,
value_tracker: Arc::new(Mutex::new(LoroValue::Map(Default::default()))),
// map_tracker: Default::default(),
// list_tracker: Default::default(),
// text_tracker: Default::default(),
map_tracker: Default::default(),
list_tracker: Default::default(),
text_tracker: Default::default(),
map_containers: Default::default(),
list_containers: Default::default(),
text_containers: Default::default(),
@ -89,6 +90,94 @@ impl Actor {
);
}));
let text = Arc::clone(&actor.text_tracker);
actor.loro.subscribe(
&ContainerID::new_root("text", ContainerType::Text),
Arc::new(move |event| {
if event.from_children {
return;
}
let mut text = text.lock().unwrap();
match &event.container.diff {
Diff::Text(delta) => {
let mut index = 0;
for item in delta.iter() {
match item {
DeltaItem::Retain { len, meta: _ } => {
index += len;
}
DeltaItem::Insert { value, meta: _ } => {
text.insert_str(index, value);
index += value.len();
}
DeltaItem::Delete { len, .. } => {
text.drain(index..index + *len);
}
}
}
}
_ => unreachable!(),
}
}),
);
let map = Arc::clone(&actor.map_tracker);
actor.loro.subscribe(
&ContainerID::new_root("map", ContainerType::Map),
Arc::new(move |event| {
if event.from_children {
return;
}
let mut map = map.lock().unwrap();
if let Diff::NewMap(map_diff) = &event.container.diff {
for (key, value) in map_diff.updated.iter() {
match &value.value {
Some(value) => {
map.insert(key.to_string(), value.clone());
}
None => {
map.remove(&key.to_string());
}
}
}
} else {
debug_dbg!(&event.container);
unreachable!()
}
}),
);
let list = Arc::clone(&actor.list_tracker);
actor.loro.subscribe(
&ContainerID::new_root("list", ContainerType::List),
Arc::new(move |event| {
if event.from_children {
return;
}
let mut list = list.lock().unwrap();
if let Diff::List(delta) = &event.container.diff {
let mut index = 0;
for item in delta.iter() {
match item {
DeltaItem::Retain { len, meta: _ } => {
index += len;
}
DeltaItem::Insert { value, meta: _ } => {
for v in value {
list.insert(index, v.clone());
index += 1;
}
}
DeltaItem::Delete { len, .. } => {
list.drain(index..index + *len);
}
}
}
} else {
unreachable!()
}
}),
);
actor
.text_containers
.push(actor.loro.txn().unwrap().get_text("text"));

View file

@ -169,10 +169,14 @@ impl SharedArena {
}
/// Call `f` on each ancestor of `container`, including `container` itself.
pub fn with_ancestors(&self, container: ContainerIdx, mut f: impl FnMut(ContainerIdx)) {
///
/// f(ContainerIdx, is_first)
pub fn with_ancestors(&self, container: ContainerIdx, mut f: impl FnMut(ContainerIdx, bool)) {
let mut container = Some(container);
let mut is_first = true;
while let Some(c) = container {
f(c);
f(c, is_first);
is_first = false;
container = self.get_parent(c);
}
}

View file

@ -19,6 +19,8 @@ pub struct ContainerDiff {
#[derive(Debug)]
pub struct DiffEvent<'a> {
/// whether the event comes from the children of the container.
pub from_children: bool,
pub container: &'a ContainerDiff,
pub doc: &'a DocDiff,
}

View file

@ -4,7 +4,7 @@ use std::{
};
use debug_log::debug_dbg;
use loro_common::{ContainerType, LoroValue};
use loro_common::{ContainerID, ContainerType, LoroValue};
use crate::{
container::{registry::ContainerIdx, ContainerIdRaw},
@ -273,7 +273,7 @@ impl LoroDoc {
}
}
pub(crate) fn subscribe_deep(&self, callback: Subscriber) -> SubID {
pub fn subscribe_deep(&self, callback: Subscriber) -> SubID {
let mut state = self.state.lock().unwrap();
if !state.is_recording() {
state.start_recording();
@ -281,6 +281,15 @@ impl LoroDoc {
self.observer.subscribe_deep(callback)
}
pub fn subscribe(&self, container_id: &ContainerID, callback: Subscriber) -> SubID {
let mut state = self.state.lock().unwrap();
if !state.is_recording() {
state.start_recording();
}
self.observer.subscribe(container_id, callback)
}
}
impl Default for LoroDoc {

View file

@ -81,26 +81,29 @@ impl Observer {
let mut inner = self.take_inner();
for container_diff in doc_diff.diff.iter() {
self.arena.with_ancestors(container_diff.idx, |ancestor| {
if let Some(subs) = inner.containers.get_mut(&ancestor) {
subs.retain(|sub| match inner.subscribers.get_mut(sub) {
Some(f) => {
f(DiffEvent {
container: container_diff,
doc: doc_diff,
});
true
}
None => false,
});
}
});
self.arena
.with_ancestors(container_diff.idx, |ancestor, is_self| {
if let Some(subs) = inner.containers.get_mut(&ancestor) {
subs.retain(|sub| match inner.subscribers.get_mut(sub) {
Some(f) => {
f(DiffEvent {
from_children: !is_self,
container: container_diff,
doc: doc_diff,
});
true
}
None => false,
});
}
});
inner
.root
.retain(|sub| match inner.subscribers.get_mut(sub) {
Some(f) => {
f(DiffEvent {
from_children: true,
container: container_diff,
doc: doc_diff,
});

View file

@ -130,7 +130,6 @@ pub trait ApplyDiff {
impl ApplyDiff for LoroValue {
fn apply_diff(&mut self, diff: &[Diff]) {
debug_dbg!(&self, diff);
match self {
LoroValue::String(value) => {
let mut s = value.to_string();