feat: support commit message (#429)

* feat: support commit message

* test: refine test
This commit is contained in:
Zixuan Chen 2024-08-24 14:15:40 +08:00 committed by GitHub
parent 81515ba6d3
commit 95bec549d7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 658 additions and 83 deletions

101
Cargo.lock generated
View file

@ -666,6 +666,7 @@ dependencies = [
"fxhash",
"itertools 0.12.1",
"loro 0.16.2",
"loro 0.16.2 (git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7)",
"loro 0.16.2 (git+https://github.com/loro-dev/loro.git?rev=90470658435ec4c62b5af59ebb82fe9e1f5aa761)",
"num_cpus",
"rand",
@ -994,6 +995,19 @@ dependencies = [
"tracing",
]
[[package]]
name = "loro"
version = "0.16.2"
source = "git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7#d2b0520f8633f96146a49ec205bd5e7056880f1a"
dependencies = [
"either",
"enum-as-inner 0.6.0",
"generic-btree",
"loro-delta 0.16.2 (git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7)",
"loro-internal 0.16.2 (git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7)",
"tracing",
]
[[package]]
name = "loro"
version = "0.16.2"
@ -1025,6 +1039,22 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "loro-common"
version = "0.16.2"
source = "git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7#d2b0520f8633f96146a49ec205bd5e7056880f1a"
dependencies = [
"arbitrary",
"enum-as-inner 0.6.0",
"fxhash",
"loro-rle 0.16.2 (git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7)",
"nonmax",
"serde",
"serde_columnar",
"string_cache",
"thiserror",
]
[[package]]
name = "loro-common"
version = "0.16.2"
@ -1057,6 +1087,18 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "loro-delta"
version = "0.16.2"
source = "git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7#d2b0520f8633f96146a49ec205bd5e7056880f1a"
dependencies = [
"arrayvec",
"enum-as-inner 0.5.1",
"generic-btree",
"heapless 0.8.0",
"tracing",
]
[[package]]
name = "loro-delta"
version = "0.16.2"
@ -1123,6 +1165,41 @@ dependencies = [
"zstd",
]
[[package]]
name = "loro-internal"
version = "0.16.2"
source = "git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7#d2b0520f8633f96146a49ec205bd5e7056880f1a"
dependencies = [
"append-only-bytes",
"arref",
"either",
"enum-as-inner 0.5.1",
"enum_dispatch",
"fxhash",
"generic-btree",
"getrandom",
"im",
"itertools 0.12.1",
"leb128",
"loro-common 0.16.2 (git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7)",
"loro-delta 0.16.2 (git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7)",
"loro-rle 0.16.2 (git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7)",
"loro_fractional_index 0.16.2 (git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7)",
"md5",
"num",
"num-derive",
"num-traits",
"once_cell",
"postcard",
"rand",
"serde",
"serde_columnar",
"serde_json",
"smallvec",
"thiserror",
"tracing",
]
[[package]]
name = "loro-internal"
version = "0.16.2"
@ -1175,6 +1252,19 @@ dependencies = [
"static_assertions",
]
[[package]]
name = "loro-rle"
version = "0.16.2"
source = "git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7#d2b0520f8633f96146a49ec205bd5e7056880f1a"
dependencies = [
"append-only-bytes",
"arref",
"enum-as-inner 0.6.0",
"fxhash",
"num",
"smallvec",
]
[[package]]
name = "loro-rle"
version = "0.16.2"
@ -1224,6 +1314,17 @@ dependencies = [
"smallvec",
]
[[package]]
name = "loro_fractional_index"
version = "0.16.2"
source = "git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7#d2b0520f8633f96146a49ec205bd5e7056880f1a"
dependencies = [
"imbl",
"rand",
"serde",
"smallvec",
]
[[package]]
name = "loro_fractional_index"
version = "0.16.2"

View file

@ -9,6 +9,7 @@ publish = false
[dependencies]
loro = { path = "../loro", features = ["counter"], package = "loro" }
loro-without-counter = { git = "https://github.com/loro-dev/loro.git", rev = "90470658435ec4c62b5af59ebb82fe9e1f5aa761", package = "loro", default-features = false }
loro-016 = { git = "https://github.com/loro-dev/loro.git", tag="loro-crdt@0.16.7", package = "loro" }
fxhash = { workspace = true }
enum_dispatch = { workspace = true }
enum-as-inner = { workspace = true }

View file

@ -0,0 +1,136 @@
use std::sync::Arc;
use loro::{ToJson as _, ID};
use loro_016::ToJson as _;
#[test]
fn updates_with_commit_message_can_be_imported_to_016() {
let doc1 = loro::LoroDoc::new();
{
let text = doc1.get_text("text");
text.insert(0, "Hello, World!").unwrap();
doc1.set_next_commit_message("Initial text insertion");
doc1.commit();
let tree = doc1.get_tree("tree");
let root = tree.create(None).unwrap();
doc1.set_next_commit_message("Added tree structure");
doc1.commit();
doc1.set_next_commit_message("Modified text");
text.delete(0, 5).unwrap();
text.insert(7, "Loro").unwrap();
doc1.commit();
doc1.set_next_commit_message("Added another child to tree");
tree.create(root).unwrap();
doc1.commit();
}
let doc2 = loro_016::LoroDoc::new();
doc2.import(&doc1.export_snapshot()).unwrap();
assert_eq!(
doc2.get_deep_value().to_json(),
doc1.get_deep_value().to_json()
);
{
doc2.get_text("text").insert(0, "123").unwrap();
doc1.import(&doc2.export_from(&Default::default())).unwrap();
}
let doc3 = loro::LoroDoc::new();
doc3.import(&doc1.export_fast_snapshot()).unwrap();
let change_from_2 = doc3.get_change(ID::new(doc2.peer_id(), 0)).unwrap();
assert_eq!(change_from_2.len, 3);
assert_eq!(doc3.get_deep_value(), doc1.get_deep_value());
assert_eq!(
doc3.get_change(ID::new(doc1.peer_id(), 0))
.unwrap()
.message(),
"Initial text insertion"
);
}
#[test]
fn snapshot_from_016_can_be_imported_in_cur_version() {
// Create a LoroDoc using loro-016
let doc_016 = loro_016::LoroDoc::new();
doc_016.set_peer_id(1).unwrap();
// Perform some operations on doc_016
{
let text = doc_016.get_text("text");
text.insert(0, "Hello, Loro!").unwrap();
doc_016.commit();
let map = doc_016.get_map("map");
map.insert("key", "value").unwrap();
doc_016.commit();
let list = doc_016.get_list("list");
list.push(1).unwrap();
list.push(2).unwrap();
list.push(3).unwrap();
doc_016.commit();
}
// Export a snapshot from doc_016
let snapshot_016 = doc_016.export_snapshot();
// Create a new LoroDoc using the current version
let doc_current = loro::LoroDoc::new();
// Import the snapshot from loro-016 into the current version
doc_current.import(&snapshot_016).unwrap();
// Verify that the imported data matches the original
assert_eq!(
doc_current.get_deep_value().to_json(),
doc_016.get_deep_value().to_json()
);
// Perform additional operations on the current version doc
{
let text = doc_current.get_text("text");
text.insert(11, " CRDT").unwrap();
doc_current.commit();
let map = doc_current.get_map("map");
map.insert("new_key", "new_value").unwrap();
doc_current.commit();
let list = doc_current.get_list("list");
list.push(4).unwrap();
doc_current.commit();
}
// Verify that new operations work correctly
assert_eq!(
doc_current.get_text("text").to_string(),
"Hello, Loro CRDT!"
);
assert_eq!(
doc_current
.get_map("map")
.get("new_key")
.unwrap()
.left()
.unwrap(),
loro::LoroValue::String(Arc::new("new_value".into()))
);
assert_eq!(doc_current.get_list("list").len(), 4);
// Export a snapshot from the current version
let snapshot_current = doc_current.export_snapshot();
// Create another LoroDoc using loro-016 and import the snapshot from the current version
let doc_016_reimport = loro_016::LoroDoc::new();
doc_016_reimport.import(&snapshot_current).unwrap();
// Verify that the reimported data in loro-016 matches the current version
assert_eq!(
doc_016_reimport.get_deep_value().to_json(),
doc_current.get_deep_value().to_json()
);
}

View file

@ -35,6 +35,7 @@ pub struct Change<O = Op> {
/// [Unix time](https://en.wikipedia.org/wiki/Unix_time)
/// It is the number of seconds that have elapsed since 00:00:00 UTC on 1 January 1970.
pub(crate) timestamp: Timestamp,
pub(crate) commit_msg: Option<Arc<str>>,
}
impl<O> Change<O> {
@ -51,6 +52,7 @@ impl<O> Change<O> {
id,
lamport,
timestamp,
commit_msg: None,
}
}
@ -88,6 +90,10 @@ impl<O> Change<O> {
pub fn deps_on_self(&self) -> bool {
self.deps.len() == 1 && self.deps[0].peer == self.id.peer
}
pub fn message(&self) -> Option<&Arc<str>> {
self.commit_msg.as_ref()
}
}
impl<O: EstimatedSize> EstimatedSize for Change<O> {
@ -149,7 +155,13 @@ impl<O> Mergable for Change<O> {
}
}
use std::fmt::Debug;
impl<O: Mergable + HasLength + HasIndex + Debug> Change<O> {
pub fn len(&self) -> usize {
self.ops.span().as_()
}
}
use std::{fmt::Debug, sync::Arc};
impl<O: Mergable + HasLength + HasIndex + Debug> HasLength for Change<O> {
fn content_len(&self) -> usize {
self.ops.span().as_()
@ -211,6 +223,7 @@ impl<O: Mergable + HasLength + HasIndex + Sliceable + HasCounter + Debug> Slicea
id: self.id.inc(from as Counter),
lamport: self.lamport + from as Lamport,
timestamp: self.timestamp,
commit_msg: self.commit_msg.clone(),
}
}
}
@ -222,11 +235,20 @@ impl DagNode for Change {
}
impl Change {
pub fn can_merge_right(&self, other: &Self) -> bool {
other.id.peer == self.id.peer
pub fn can_merge_right(&self, other: &Self, merge_interval: i64) -> bool {
if other.id.peer == self.id.peer
&& other.id.counter == self.id.counter + self.content_len() as Counter
&& other.deps.len() == 1
&& other.deps[0].peer == self.id.peer
&& other.timestamp - self.timestamp < merge_interval
&& self.commit_msg == other.commit_msg
{
debug_assert!(other.timestamp >= self.timestamp);
debug_assert!(other.lamport == self.lamport + self.len() as Lamport);
true
} else {
false
}
}
}

View file

@ -151,10 +151,18 @@ pub(crate) fn decode_updates(oplog: &mut OpLog, bytes: &[u8]) -> LoroResult<()>
let DecodedArenas {
peer_ids,
deps,
keys,
state_blob_arena: _,
..
} = arenas;
let changes = decode_changes(iter.changes, iter.start_counters, &peer_ids, deps, ops_map)?;
let changes = decode_changes(
iter.changes,
iter.start_counters,
&peer_ids,
&keys,
deps,
ops_map,
)?;
let (latest_ids, pending_changes) = import_changes_to_oplog(changes, oplog)?;
// TODO: PERF: should we use hashmap to filter latest_ids with the same peer first?
oplog.try_apply_pending(latest_ids);
@ -255,12 +263,9 @@ fn decode_changes<'a>(
encoded_changes: IterableEncodedChange<'_>,
mut counters: Vec<i32>,
peer_ids: &PeerIdArena,
keys: &KeyArena,
mut deps: impl Iterator<Item = Result<EncodedDep, ColumnarError>> + 'a,
mut ops_map: std::collections::HashMap<
u64,
Vec<Op>,
std::hash::BuildHasherDefault<fxhash::FxHasher>,
>,
mut ops_map: FxHashMap<u64, Vec<Op>>,
) -> LoroResult<Vec<Change>> {
let mut changes = Vec::with_capacity(encoded_changes.size_hint().0);
for encoded_change in encoded_changes {
@ -270,7 +275,7 @@ fn decode_changes<'a>(
timestamp,
deps_len,
dep_on_self,
msg_len: _,
msg_idx_plus_one,
} = encoded_change?;
if peer_ids.peer_ids.len() <= peer_idx || counters.len() <= peer_idx {
return Err(LoroError::DecodeDataCorruptionError);
@ -284,6 +289,13 @@ fn decode_changes<'a>(
ops: Default::default(),
deps: Frontiers::with_capacity((deps_len + if dep_on_self { 1 } else { 0 }) as usize),
lamport: 0,
commit_msg: if msg_idx_plus_one == 0 {
None
} else {
let key = keys.get(msg_idx_plus_one as usize - 1).unwrap();
let s = key.to_string();
Some(Arc::from(s))
},
timestamp,
};
@ -650,12 +662,20 @@ pub(crate) fn decode_snapshot(doc: &LoroDoc, bytes: &[u8]) -> LoroResult<()> {
)?;
let DecodedArenas {
peer_ids,
keys,
deps,
state_blob_arena,
..
} = arenas;
let changes = decode_changes(iter.changes, iter.start_counters, &peer_ids, deps, ops_map)?;
let changes = decode_changes(
iter.changes,
iter.start_counters,
&peer_ids,
&keys,
deps,
ops_map,
)?;
let (new_ids, pending_changes) = import_changes_to_oplog(changes, &mut oplog)?;
for op in ops.iter_mut() {
@ -883,7 +903,7 @@ mod encode {
use fxhash::FxHashMap;
use loro_common::{ContainerType, HasId, PeerID, ID};
use rle::{HasLength, Sliceable};
use std::borrow::Cow;
use std::{borrow::Cow, ops::Deref};
use crate::{
arena::SharedArena,
@ -1032,13 +1052,19 @@ mod encode {
}
let peer_idx = registers.peer.register(&change.id.peer);
let msg_idx_plus_one = if let Some(msg) = change.commit_msg.as_ref() {
registers.key.register(&msg.deref().into()) + 1
} else {
0
};
changes.push(EncodedChange {
dep_on_self,
deps_len,
peer_idx,
len: change.atom_len(),
timestamp: change.timestamp,
msg_len: 0,
msg_idx_plus_one: msg_idx_plus_one as i32,
});
for op in change.ops().iter() {
@ -1589,7 +1615,7 @@ struct EncodedChange {
#[columnar(strategy = "BoolRle")]
dep_on_self: bool,
#[columnar(strategy = "DeltaRle")]
msg_len: i32,
msg_idx_plus_one: i32,
}
#[columnar(vec, ser, de, iterable)]

View file

@ -22,7 +22,7 @@ use crate::{
};
use super::encode_reordered::{import_changes_to_oplog, ValueRegister};
use op::{JsonOpContent, JsonSchema};
use json::{JsonOpContent, JsonSchema};
const SCHEMA_VERSION: u8 = 1;
@ -181,7 +181,7 @@ fn encode_changes(
diff_changes: &[Either<BlockChangeRef, Change>],
arena: &SharedArena,
peer_register: &mut ValueRegister<PeerID>,
) -> Vec<op::Change> {
) -> Vec<json::JsonChange> {
let mut changes = Vec::with_capacity(diff_changes.len());
for change in diff_changes.iter() {
let change: &Change = match change {
@ -212,7 +212,7 @@ fn encode_changes(
}
}
});
op::ListOp::Insert {
json::ListOp::Insert {
pos: *pos,
value: value.into(),
}
@ -220,7 +220,7 @@ fn encode_changes(
InnerListOp::Delete(DeleteSpanWithId {
id_start,
span: DeleteSpan { pos, signed_len },
}) => op::ListOp::Delete {
}) => json::ListOp::Delete {
pos: *pos,
len: *signed_len,
start_id: register_id(id_start, peer_register),
@ -241,7 +241,7 @@ fn encode_changes(
}
}
});
op::MovableListOp::Insert {
json::MovableListOp::Insert {
pos: *pos,
value: value.into(),
}
@ -249,7 +249,7 @@ fn encode_changes(
InnerListOp::Delete(DeleteSpanWithId {
id_start,
span: DeleteSpan { pos, signed_len },
}) => op::MovableListOp::Delete {
}) => json::MovableListOp::Delete {
pos: *pos,
len: *signed_len,
start_id: register_id(id_start, peer_register),
@ -258,7 +258,7 @@ fn encode_changes(
from,
elem_id: from_id,
to,
} => op::MovableListOp::Move {
} => json::MovableListOp::Move {
from: *from,
to: *to,
elem_id: register_idlp(from_id, peer_register),
@ -276,7 +276,7 @@ fn encode_changes(
} else {
value.clone()
};
op::MovableListOp::Set {
json::MovableListOp::Set {
elem_id: register_idlp(elem_id, peer_register),
value,
}
@ -294,12 +294,12 @@ fn encode_changes(
pos,
} => {
let text = String::from_utf8(slice.as_bytes().to_vec()).unwrap();
op::TextOp::Insert { pos: *pos, text }
json::TextOp::Insert { pos: *pos, text }
}
InnerListOp::Delete(DeleteSpanWithId {
id_start,
span: DeleteSpan { pos, signed_len },
}) => op::TextOp::Delete {
}) => json::TextOp::Delete {
pos: *pos,
len: *signed_len,
start_id: register_id(id_start, peer_register),
@ -310,14 +310,14 @@ fn encode_changes(
key,
value,
info,
} => op::TextOp::Mark {
} => json::TextOp::Mark {
start: *start,
end: *end,
style_key: key.to_string(),
style_value: value.clone(),
info: info.to_byte(),
},
InnerListOp::StyleEnd => op::TextOp::MarkEnd,
InnerListOp::StyleEnd => json::TextOp::MarkEnd,
_ => unreachable!(),
}),
_ => unreachable!(),
@ -337,12 +337,12 @@ fn encode_changes(
} else {
v.clone()
};
op::MapOp::Insert {
json::MapOp::Insert {
key: key.to_string(),
value,
}
} else {
op::MapOp::Delete {
json::MapOp::Delete {
key: key.to_string(),
}
})
@ -357,7 +357,7 @@ fn encode_changes(
target,
parent,
position,
} => op::TreeOp::Create {
} => json::TreeOp::Create {
target: register_tree_id(target, peer_register),
parent: parent.map(|p| register_tree_id(&p, peer_register)),
fractional_index: position.clone(),
@ -366,12 +366,12 @@ fn encode_changes(
target,
parent,
position,
} => op::TreeOp::Move {
} => json::TreeOp::Move {
target: register_tree_id(target, peer_register),
parent: parent.map(|p| register_tree_id(&p, peer_register)),
fractional_index: position.clone(),
},
TreeOp::Delete { target } => op::TreeOp::Delete {
TreeOp::Delete { target } => json::TreeOp::Delete {
target: register_tree_id(target, peer_register),
},
}),
@ -382,9 +382,9 @@ fn encode_changes(
else {
unreachable!();
};
JsonOpContent::Future(op::FutureOpWrapper {
JsonOpContent::Future(json::FutureOpWrapper {
prop: *prop,
value: op::FutureOp::Unknown((**value).clone()),
value: json::FutureOp::Unknown((**value).clone()),
})
}
#[cfg(feature = "counter")]
@ -394,22 +394,22 @@ fn encode_changes(
};
match f {
FutureInnerContent::Counter(x) => {
JsonOpContent::Future(op::FutureOpWrapper {
JsonOpContent::Future(json::FutureOpWrapper {
prop: 0,
value: op::FutureOp::Counter(super::OwnedValue::F64(*x)),
value: json::FutureOp::Counter(super::OwnedValue::F64(*x)),
})
}
_ => unreachable!(),
}
}
};
ops.push(op::JsonOp {
ops.push(json::JsonOp {
counter: *counter,
container,
content: op,
});
}
let c = op::Change {
let c = json::JsonChange {
id: register_id(&change.id, peer_register),
ops,
deps: change
@ -419,8 +419,9 @@ fn encode_changes(
.collect(),
lamport: change.lamport,
timestamp: change.timestamp,
msg: None,
msg: change.message().map(|x| x.to_string()),
};
changes.push(c);
}
changes
@ -429,12 +430,12 @@ fn encode_changes(
fn decode_changes(json: JsonSchema, arena: &SharedArena) -> LoroResult<Vec<Change>> {
let JsonSchema { peers, changes, .. } = json;
let mut ans = Vec::with_capacity(changes.len());
for op::Change {
for json::JsonChange {
id,
timestamp,
deps,
lamport,
msg: _,
msg,
ops: json_ops,
} in changes
{
@ -450,14 +451,15 @@ fn decode_changes(json: JsonSchema, arena: &SharedArena) -> LoroResult<Vec<Chang
deps: Frontiers::from_iter(deps.into_iter().map(|id| convert_id(&id, &peers))),
lamport,
ops,
commit_msg: msg.map(|x| x.into()),
};
ans.push(change);
}
Ok(ans)
}
fn decode_op(op: op::JsonOp, arena: &SharedArena, peers: &[PeerID]) -> LoroResult<Op> {
let op::JsonOp {
fn decode_op(op: json::JsonOp, arena: &SharedArena, peers: &[PeerID]) -> LoroResult<Op> {
let json::JsonOp {
counter,
container,
content,
@ -467,7 +469,7 @@ fn decode_op(op: op::JsonOp, arena: &SharedArena, peers: &[PeerID]) -> LoroResul
let content = match container.container_type() {
ContainerType::Text => match content {
JsonOpContent::Text(text) => match text {
op::TextOp::Insert { pos, text } => {
json::TextOp::Insert { pos, text } => {
let (slice, result) = arena.alloc_str_with_slice(&text);
InnerContent::List(InnerListOp::InsertText {
slice,
@ -476,7 +478,7 @@ fn decode_op(op: op::JsonOp, arena: &SharedArena, peers: &[PeerID]) -> LoroResul
pos,
})
}
op::TextOp::Delete {
json::TextOp::Delete {
pos,
len,
start_id: id_start,
@ -490,7 +492,7 @@ fn decode_op(op: op::JsonOp, arena: &SharedArena, peers: &[PeerID]) -> LoroResul
},
}))
}
op::TextOp::Mark {
json::TextOp::Mark {
start,
end,
style_key,
@ -503,13 +505,13 @@ fn decode_op(op: op::JsonOp, arena: &SharedArena, peers: &[PeerID]) -> LoroResul
value: style_value,
info: TextStyleInfoFlag::from_byte(info),
}),
op::TextOp::MarkEnd => InnerContent::List(InnerListOp::StyleEnd),
json::TextOp::MarkEnd => InnerContent::List(InnerListOp::StyleEnd),
},
_ => unreachable!(),
},
ContainerType::List => match content {
JsonOpContent::List(list) => match list {
op::ListOp::Insert { pos, value } => {
json::ListOp::Insert { pos, value } => {
let mut values = value.into_list().unwrap();
Arc::make_mut(&mut values).iter_mut().for_each(|v| {
if let LoroValue::Container(id) = v {
@ -524,7 +526,7 @@ fn decode_op(op: op::JsonOp, arena: &SharedArena, peers: &[PeerID]) -> LoroResul
pos,
})
}
op::ListOp::Delete { pos, len, start_id } => {
json::ListOp::Delete { pos, len, start_id } => {
InnerContent::List(InnerListOp::Delete(DeleteSpanWithId {
id_start: convert_id(&start_id, peers),
span: DeleteSpan {
@ -538,7 +540,7 @@ fn decode_op(op: op::JsonOp, arena: &SharedArena, peers: &[PeerID]) -> LoroResul
},
ContainerType::MovableList => match content {
JsonOpContent::MovableList(list) => match list {
op::MovableListOp::Insert { pos, value } => {
json::MovableListOp::Insert { pos, value } => {
let mut values = value.into_list().unwrap();
Arc::make_mut(&mut values).iter_mut().for_each(|v| {
if let LoroValue::Container(id) = v {
@ -553,7 +555,7 @@ fn decode_op(op: op::JsonOp, arena: &SharedArena, peers: &[PeerID]) -> LoroResul
pos,
})
}
op::MovableListOp::Delete { pos, len, start_id } => {
json::MovableListOp::Delete { pos, len, start_id } => {
InnerContent::List(InnerListOp::Delete(DeleteSpanWithId {
id_start: convert_id(&start_id, peers),
span: DeleteSpan {
@ -562,7 +564,7 @@ fn decode_op(op: op::JsonOp, arena: &SharedArena, peers: &[PeerID]) -> LoroResul
},
}))
}
op::MovableListOp::Move {
json::MovableListOp::Move {
from,
elem_id: from_id,
to,
@ -574,7 +576,7 @@ fn decode_op(op: op::JsonOp, arena: &SharedArena, peers: &[PeerID]) -> LoroResul
to,
})
}
op::MovableListOp::Set { elem_id, mut value } => {
json::MovableListOp::Set { elem_id, mut value } => {
let elem_id = convert_idlp(&elem_id, peers);
if let LoroValue::Container(id) = &mut value {
*id = convert_container_id(id.clone(), peers);
@ -586,7 +588,7 @@ fn decode_op(op: op::JsonOp, arena: &SharedArena, peers: &[PeerID]) -> LoroResul
},
ContainerType::Map => match content {
JsonOpContent::Map(map) => match map {
op::MapOp::Insert { key, mut value } => {
json::MapOp::Insert { key, mut value } => {
if let LoroValue::Container(id) = &mut value {
*id = convert_container_id(id.clone(), peers);
}
@ -595,7 +597,7 @@ fn decode_op(op: op::JsonOp, arena: &SharedArena, peers: &[PeerID]) -> LoroResul
value: Some(value),
})
}
op::MapOp::Delete { key } => InnerContent::Map(MapSet {
json::MapOp::Delete { key } => InnerContent::Map(MapSet {
key: key.into(),
value: None,
}),
@ -604,7 +606,7 @@ fn decode_op(op: op::JsonOp, arena: &SharedArena, peers: &[PeerID]) -> LoroResul
},
ContainerType::Tree => match content {
JsonOpContent::Tree(tree) => match tree {
op::TreeOp::Create {
json::TreeOp::Create {
target,
parent,
fractional_index,
@ -613,7 +615,7 @@ fn decode_op(op: op::JsonOp, arena: &SharedArena, peers: &[PeerID]) -> LoroResul
parent: parent.map(|p| convert_tree_id(&p, peers)),
position: fractional_index,
})),
op::TreeOp::Move {
json::TreeOp::Move {
target,
parent,
fractional_index,
@ -622,16 +624,16 @@ fn decode_op(op: op::JsonOp, arena: &SharedArena, peers: &[PeerID]) -> LoroResul
parent: parent.map(|p| convert_tree_id(&p, peers)),
position: fractional_index,
})),
op::TreeOp::Delete { target } => InnerContent::Tree(Arc::new(TreeOp::Delete {
json::TreeOp::Delete { target } => InnerContent::Tree(Arc::new(TreeOp::Delete {
target: convert_tree_id(&target, peers),
})),
},
_ => unreachable!(),
},
ContainerType::Unknown(_) => match content {
JsonOpContent::Future(op::FutureOpWrapper {
JsonOpContent::Future(json::FutureOpWrapper {
prop,
value: op::FutureOp::Unknown(value),
value: json::FutureOp::Unknown(value),
}) => InnerContent::Future(FutureInnerContent::Unknown {
prop,
value: Box::new(value),
@ -640,17 +642,17 @@ fn decode_op(op: op::JsonOp, arena: &SharedArena, peers: &[PeerID]) -> LoroResul
},
#[cfg(feature = "counter")]
ContainerType::Counter => {
let JsonOpContent::Future(op::FutureOpWrapper { prop: _, value }) = content else {
let JsonOpContent::Future(json::FutureOpWrapper { prop: _, value }) = content else {
unreachable!()
};
use crate::encoding::OwnedValue;
match value {
op::FutureOp::Counter(OwnedValue::F64(c))
| op::FutureOp::Unknown(OwnedValue::F64(c)) => {
json::FutureOp::Counter(OwnedValue::F64(c))
| json::FutureOp::Unknown(OwnedValue::F64(c)) => {
InnerContent::Future(FutureInnerContent::Counter(c))
}
op::FutureOp::Counter(OwnedValue::I64(c))
| op::FutureOp::Unknown(OwnedValue::I64(c)) => {
json::FutureOp::Counter(OwnedValue::I64(c))
| json::FutureOp::Unknown(OwnedValue::I64(c)) => {
InnerContent::Future(FutureInnerContent::Counter(c as f64))
}
_ => unreachable!(),
@ -688,7 +690,7 @@ impl TryFrom<String> for JsonSchema {
}
}
pub mod op {
pub mod json {
use fractional_index::FractionalIndex;
use loro_common::{ContainerID, IdLp, Lamport, LoroValue, PeerID, TreeID, ID};
@ -704,10 +706,11 @@ pub mod op {
pub start_version: Frontiers,
#[serde(with = "self::serde_impl::peer_id")]
pub peers: Vec<PeerID>,
pub changes: Vec<Change>,
pub changes: Vec<JsonChange>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Change {
pub struct JsonChange {
#[serde(with = "self::serde_impl::id")]
pub id: ID,
pub timestamp: i64,

View file

@ -65,7 +65,7 @@ pub(crate) use id::{PeerID, ID};
pub(crate) use loro_common::InternalString;
pub use container::ContainerType;
pub use encoding::json_schema::op::*;
pub use encoding::json_schema::json;
pub use loro_common::{loro_value, to_value};
#[cfg(feature = "wasm")]
pub use value::wasm;

View file

@ -6,6 +6,7 @@ use rle::HasLength;
use std::{
borrow::Cow,
cmp::Ordering,
f32::consts::E,
sync::{
atomic::{
AtomicBool,
@ -28,7 +29,7 @@ use crate::{
dag::DagUtils,
diff_calc::DiffCalculator,
encoding::{
decode_snapshot, export_fast_snapshot, export_snapshot, json_schema::op::JsonSchema,
decode_snapshot, export_fast_snapshot, export_snapshot, json_schema::json::JsonSchema,
parse_header_and_body, EncodeMode, ParsedHeaderAndBody,
},
event::{str_to_path, EventTriggerKind, Index, InternalDocDiff, Path},
@ -371,6 +372,10 @@ impl LoroDoc {
txn.set_timestamp(timestamp);
}
if let Some(msg) = config.commit_msg.as_ref() {
txn.set_msg(Some(msg.clone()));
}
txn.commit().unwrap();
if config.immediate_renew {
let mut txn_guard = self.txn.try_lock().unwrap();
@ -383,6 +388,20 @@ impl LoroDoc {
}
}
/// Set the commit message of the next commit
pub fn set_next_commit_message(&self, message: &str) {
let mut binding = self.txn.try_lock().unwrap();
let Some(txn) = binding.as_mut() else {
return;
};
if message.is_empty() {
txn.set_msg(None)
} else {
txn.set_msg(Some(message.into()))
}
}
#[inline]
pub fn renew_txn_if_auto_commit(&self) {
if self.auto_commit.load(Acquire) && !self.detached.load(Acquire) {
@ -1455,10 +1474,10 @@ impl<'a> Drop for CommitWhenDrop<'a> {
#[derive(Debug, Clone)]
pub struct CommitOptions {
origin: Option<InternalString>,
immediate_renew: bool,
timestamp: Option<Timestamp>,
commit_msg: Option<Box<str>>,
pub origin: Option<InternalString>,
pub immediate_renew: bool,
pub timestamp: Option<Timestamp>,
pub commit_msg: Option<Arc<str>>,
}
impl CommitOptions {

View file

@ -636,6 +636,7 @@ pub(crate) fn convert_change_to_remote(
deps: change.deps.clone(),
lamport: change.lamport,
timestamp: change.timestamp,
commit_msg: change.commit_msg.clone(),
}
}

View file

@ -672,6 +672,7 @@ mod mut_inner_kv {
id: change.id,
lamport: change.lamport,
timestamp: change.timestamp,
commit_msg: change.commit_msg.clone(),
};
let mut total_len = 0;
@ -744,6 +745,7 @@ mod mut_inner_kv {
id: ID::new(new_change.id.peer, ctr_end),
lamport: next_lamport,
timestamp: new_change.timestamp,
commit_msg: new_change.commit_msg.clone(),
};
self.insert_change(new_change, false);
@ -1010,8 +1012,7 @@ impl ChangesBlock {
let changes = Arc::make_mut(changes);
match changes.last_mut() {
Some(last)
if change.deps_on_self()
&& change.timestamp - last.timestamp < merge_interval
if last.can_merge_right(&change, merge_interval)
&& (!is_full
|| (change.ops.len() == 1
&& last.ops.last().unwrap().is_mergable(&change.ops[0], &()))) =>

View file

@ -69,6 +69,7 @@
use std::borrow::Cow;
use std::collections::BTreeSet;
use std::io::Write;
use std::sync::Arc;
use fractional_index::FractionalIndex;
use loro_common::{
@ -153,6 +154,7 @@ pub fn encode_block(block: &[Change], arena: &SharedArena) -> Vec<u8> {
let mut timestamp_encoder = DeltaRleEncoder::new();
let mut lamport_encoder = UnsignedDeltaEncoder::new(block.len() * 2 + 4);
let mut commit_msg_len_encoder = AnyRleEncoder::<u32>::new();
let mut commit_msgs = String::new();
let mut dep_self_encoder = BoolRleEncoder::new();
let mut dep_len_encoder = AnyRleEncoder::<u64>::new();
let mut encoded_deps = EncodedDeps {
@ -162,7 +164,12 @@ pub fn encode_block(block: &[Change], arena: &SharedArena) -> Vec<u8> {
for c in block {
timestamp_encoder.append(c.timestamp()).unwrap();
lamport_encoder.push(c.lamport() as u64);
commit_msg_len_encoder.append(0).unwrap();
if let Some(msg) = c.commit_msg.as_ref() {
commit_msg_len_encoder.append(msg.len() as u32).unwrap();
commit_msgs.push_str(msg);
} else {
commit_msg_len_encoder.append(0).unwrap();
}
let mut dep_on_self = false;
for dep in c.deps().iter() {
@ -311,7 +318,7 @@ pub fn encode_block(block: &[Change], arena: &SharedArena) -> Vec<u8> {
lamports: lamport_encoder.finish().0.into(),
timestamps: timestamp_encoder.finish().unwrap().into(),
commit_msg_lengths: commit_msg_len_encoder.finish().unwrap().into(),
commit_msgs: Cow::Owned(vec![]),
commit_msgs: Cow::Owned(commit_msgs.into_bytes()),
cids: container_arena.encode().into(),
keys: keys_bytes.into(),
positions: position_bytes.into(),
@ -677,6 +684,7 @@ pub fn decode_block(
first_counter,
timestamps,
commit_msg_lengths,
commit_msgs,
cids,
keys,
ops,
@ -689,7 +697,8 @@ pub fn decode_block(
return Err(LoroError::IncompatibleFutureEncodingError(version as usize));
}
let mut timestamp_decoder: DeltaRleDecoder<i64> = DeltaRleDecoder::new(&timestamps);
let _commit_msg_len_decoder = AnyRleDecoder::<u32>::new(&commit_msg_lengths);
let mut commit_msg_len_decoder = AnyRleDecoder::<u32>::new(&commit_msg_lengths);
let mut commit_msg_index = 0;
let keys = header.keys.get_or_init(|| decode_keys(&keys));
let decode_arena = ValueDecodeArena {
peers: &header.peers,
@ -711,12 +720,31 @@ pub fn decode_block(
let op_iter = encoded_ops_iters.ops;
let mut del_iter = encoded_ops_iters.delete_start_ids;
for i in 0..(n_changes as usize) {
let commit_msg: Option<Arc<str>> = {
let len = commit_msg_len_decoder.next().unwrap().unwrap();
if len == 0 {
None
} else {
let end = commit_msg_index + len;
match std::str::from_utf8(&commit_msgs[commit_msg_index as usize..end as usize]) {
Ok(s) => {
commit_msg_index = end;
Some(Arc::from(s))
}
Err(_) => {
tracing::error!("Invalid UTF8 String");
return LoroResult::Err(LoroError::DecodeDataCorruptionError);
}
}
}
};
changes.push(Change {
ops: Default::default(),
deps: header.deps_groups[i].clone(),
id: ID::new(header.peer, header.counters[i]),
lamport: header.lamports[i],
timestamp: timestamp_decoder.next().unwrap().unwrap() as Timestamp,
commit_msg,
})
}

View file

@ -57,6 +57,7 @@ pub struct Transaction {
finished: bool,
on_commit: Option<OnCommitFn>,
timestamp: Option<Timestamp>,
msg: Option<Arc<str>>,
}
impl std::fmt::Debug for Transaction {
@ -274,6 +275,7 @@ impl Transaction {
local_ops: RleVec::new(),
finished: false,
on_commit: None,
msg: None,
}
}
@ -289,6 +291,10 @@ impl Transaction {
self.timestamp = Some(time);
}
pub fn set_msg(&mut self, msg: Option<Arc<str>>) {
self.msg = msg;
}
pub(crate) fn set_on_commit(&mut self, f: OnCommitFn) {
self.on_commit = Some(f);
}
@ -321,6 +327,7 @@ impl Transaction {
self.timestamp
.unwrap_or_else(|| oplog.get_timestamp_for_next_txn()),
),
commit_msg: take(&mut self.msg),
};
let diff = if state.is_recording() {

View file

@ -17,12 +17,13 @@ use loro_internal::{
Handler, ListHandler, MapHandler, TextDelta, TextHandler, TreeHandler, ValueOrHandler,
},
id::{Counter, PeerID, TreeID, ID},
json::JsonSchema,
loro::CommitOptions,
obs::SubID,
undo::{UndoItemMeta, UndoOrRedo},
version::Frontiers,
ContainerType, DiffEvent, FxHashMap, HandlerTrait, JsonSchema, LoroDoc, LoroValue,
MovableListHandler, UndoManager as InnerUndoManager, VersionVector as InternalVersionVector,
ContainerType, DiffEvent, FxHashMap, HandlerTrait, LoroDoc, LoroValue, MovableListHandler,
UndoManager as InnerUndoManager, VersionVector as InternalVersionVector,
};
use rle::HasLength;
use serde::{Deserialize, Serialize};

View file

@ -0,0 +1,48 @@
use std::sync::Arc;
use loro_internal::{
change::{Change, Lamport, Timestamp},
id::ID,
version::Frontiers,
};
/// `Change` is a grouped continuous operations that share the same id, timestamp, commit message.
///
/// - The id of the `Change` is the id of its first op.
/// - The second op's id is `{ peer: change.id.peer, counter: change.id.counter + 1 }`
///
/// The same applies on `Lamport`:
///
/// - The lamport of the `Change` is the lamport of its first op.
/// - The second op's lamport is `change.lamport + 1`
///
/// The length of the `Change` is how many operations it contains
#[derive(Debug, Clone)]
pub struct ChangeMeta {
pub id: ID,
pub lamport: Lamport,
pub timestamp: Timestamp,
pub message: Option<Arc<str>>,
pub deps: Frontiers,
pub len: usize,
}
impl ChangeMeta {
pub(super) fn from_change(c: &Change) -> Self {
Self {
id: c.id(),
lamport: c.lamport(),
timestamp: c.timestamp(),
message: c.message().cloned(),
deps: c.deps().clone(),
len: c.len(),
}
}
pub fn message(&self) -> &str {
match self.message.as_ref() {
Some(m) => m,
None => "",
}
}
}

View file

@ -1,6 +1,7 @@
#![doc = include_str!("../README.md")]
#![warn(missing_docs)]
#![warn(missing_debug_implementations)]
use change_meta::ChangeMeta;
use either::Either;
use event::{DiffEvent, Subscriber};
use loro_internal::container::IntoContainerId;
@ -11,11 +12,11 @@ use loro_internal::cursor::Side;
use loro_internal::encoding::ImportBlobMetadata;
use loro_internal::handler::HandlerTrait;
use loro_internal::handler::ValueOrHandler;
use loro_internal::json::JsonChange;
use loro_internal::undo::{OnPop, OnPush};
use loro_internal::DocState;
use loro_internal::LoroDoc as InnerLoroDoc;
use loro_internal::OpLog;
use loro_internal::{
handler::Handler as InnerHandler, ListHandler as InnerListHandler,
MapHandler as InnerMapHandler, MovableListHandler as InnerMovableListHandler,
@ -25,9 +26,9 @@ use loro_internal::{
use std::cmp::Ordering;
use std::ops::Range;
use std::sync::Arc;
use tracing::info;
mod change_meta;
pub mod event;
pub use loro_internal::awareness;
pub use loro_internal::configure::Configure;
@ -39,6 +40,8 @@ pub use loro_internal::delta::{TreeDeltaItem, TreeDiff, TreeExternalDiff};
pub use loro_internal::event::Index;
pub use loro_internal::handler::TextDelta;
pub use loro_internal::id::{PeerID, TreeID, ID};
pub use loro_internal::json;
pub use loro_internal::json::JsonSchema;
pub use loro_internal::loro::CommitOptions;
pub use loro_internal::loro::DocAnalysis;
pub use loro_internal::obs::SubID;
@ -46,7 +49,6 @@ pub use loro_internal::oplog::FrontiersNotIncluded;
pub use loro_internal::undo;
pub use loro_internal::version::{Frontiers, VersionVector};
pub use loro_internal::ApplyDiff;
pub use loro_internal::JsonSchema;
pub use loro_internal::UndoManager as InnerUndoManager;
pub use loro_internal::{loro_value, to_value};
pub use loro_internal::{LoroError, LoroResult, LoroValue, ToJson};
@ -95,6 +97,24 @@ impl LoroDoc {
self.doc.config()
}
/// Get `Change` at the given id.
///
/// `Change` is a grouped continuous operations that share the same id, timestamp, commit message.
///
/// - The id of the `Change` is the id of its first op.
/// - The second op's id is `{ peer: change.id.peer, counter: change.id.counter + 1 }`
///
/// The same applies on `Lamport`:
///
/// - The lamport of the `Change` is the lamport of its first op.
/// - The second op's lamport is `change.lamport + 1`
///
/// The length of the `Change` is how many operations it contains
pub fn get_change(&self, id: ID) -> Option<ChangeMeta> {
let change = self.doc.oplog().lock().unwrap().get_change_at(id)?;
Some(ChangeMeta::from_change(&change))
}
/// Decodes the metadata for an imported blob from the provided bytes.
#[inline]
pub fn decode_import_blob_meta(bytes: &[u8]) -> LoroResult<ImportBlobMetadata> {
@ -302,6 +322,11 @@ impl LoroDoc {
self.doc.commit_with(options)
}
/// Set commit message for the current uncommitted changes
pub fn set_next_commit_message(&self, msg: &str) {
self.doc.set_next_commit_message(msg)
}
/// Whether the document is in detached mode, where the [loro_internal::DocState] is not
/// synchronized with the latest version of the [loro_internal::OpLog].
#[inline]

View file

@ -0,0 +1,156 @@
use loro::{CommitOptions, LoroDoc, VersionVector, ID};
#[test]
fn test_commit_message() {
let doc = LoroDoc::new();
let text = doc.get_text("text");
text.insert(0, "hello").unwrap();
doc.commit_with(CommitOptions::new().commit_msg("edits"));
let change = doc.get_change(ID::new(doc.peer_id(), 0)).unwrap();
assert_eq!(change.message(), "edits");
// The commit message can be synced to other peers as well
let doc2 = LoroDoc::new();
doc2.import(&doc.export_snapshot()).unwrap();
let change = doc.get_change(ID::new(doc.peer_id(), 1)).unwrap();
assert_eq!(change.message(), "edits");
}
#[test]
fn changes_with_commit_message_won_t_merge() {
let doc = LoroDoc::new();
let text = doc.get_text("text");
text.insert(0, "hello").unwrap();
doc.commit_with(CommitOptions::new().commit_msg("edit 1"));
text.insert(5, " world").unwrap();
doc.commit_with(CommitOptions::new().commit_msg("edit 2"));
assert_eq!(text.to_string(), "hello world");
let change1 = doc.get_change(ID::new(doc.peer_id(), 1)).unwrap();
let change2 = doc.get_change(ID::new(doc.peer_id(), 6)).unwrap();
assert_eq!(change1.message(), "edit 1");
assert_eq!(change2.message(), "edit 2");
}
#[test]
fn test_syncing_commit_message() {
let doc1 = LoroDoc::new();
doc1.set_peer_id(1).unwrap();
let text1 = doc1.get_text("text");
text1.insert(0, "hello").unwrap();
doc1.commit_with(CommitOptions::new().commit_msg("edit on doc1"));
let doc2 = LoroDoc::new();
doc2.set_peer_id(2).unwrap();
// Export changes from doc1 and import to doc2
let changes = doc1.export_from(&Default::default());
doc2.import(&changes).unwrap();
// Verify the commit message was synced
let change = doc2.get_change(ID::new(1, 1)).unwrap();
assert_eq!(change.message(), "edit on doc1");
// Verify the text content was also synced
let text2 = doc2.get_text("text");
assert_eq!(text2.to_string(), "hello");
}
#[test]
fn test_commit_message_sync_via_snapshot() {
let doc1 = LoroDoc::new();
doc1.set_peer_id(1).unwrap();
let text1 = doc1.get_text("text");
text1.insert(0, "hello").unwrap();
doc1.commit_with(CommitOptions::new().commit_msg("first edit"));
text1.insert(5, " world").unwrap();
doc1.commit_with(CommitOptions::new().commit_msg("second edit"));
// Create a snapshot of doc1
let snapshot = doc1.export_snapshot();
// Create a new doc from the snapshot
let doc2 = LoroDoc::new();
doc2.import(&snapshot).unwrap();
// Verify the commit messages were preserved in the snapshot
let change1 = doc2.get_change(ID::new(1, 1)).unwrap();
let change2 = doc2.get_change(ID::new(1, 6)).unwrap();
assert_eq!(change1.message(), "first edit");
assert_eq!(change2.message(), "second edit");
// Verify the text content was also preserved
let text2 = doc2.get_text("text");
assert_eq!(text2.to_string(), "hello world");
}
#[test]
fn test_commit_message_sync_via_fast_snapshot() {
let doc1 = LoroDoc::new();
let doc2 = LoroDoc::new();
doc1.set_peer_id(1).unwrap();
let text1 = doc1.get_text("text");
text1.insert(0, "hello").unwrap();
doc1.commit_with(CommitOptions::new().commit_msg("first edit"));
text1.insert(5, " world").unwrap();
doc1.commit_with(CommitOptions::new().commit_msg("second edit"));
let snapshot = doc1.export_fast_snapshot();
doc2.import(&snapshot).unwrap();
// Verify the commit messages were preserved in the snapshot
let change1 = doc2.get_change(ID::new(1, 1)).unwrap();
let change2 = doc2.get_change(ID::new(1, 6)).unwrap();
assert_eq!(change1.message(), "first edit");
assert_eq!(change2.message(), "second edit");
// Verify the text content was also preserved
let text2 = doc2.get_text("text");
assert_eq!(text2.to_string(), "hello world");
text2.delete(0, 10).unwrap();
doc2.set_next_commit_message("From text2");
doc1.import(&doc2.export_fast_snapshot()).unwrap();
let c = doc1.get_change(ID::new(doc2.peer_id(), 0)).unwrap();
assert_eq!(c.message(), "From text2");
}
#[test]
fn test_commit_message_json_updates() {
let doc1 = LoroDoc::new();
let text1 = doc1.get_text("text");
text1.insert(0, "hello").unwrap();
doc1.commit_with(CommitOptions::new().commit_msg("first edit"));
text1.insert(5, " world").unwrap();
doc1.commit_with(CommitOptions::new().commit_msg("second edit"));
let start_vv = VersionVector::new();
let end_vv = doc1.oplog_vv();
let json_updates = doc1.export_json_updates(&start_vv, &end_vv);
let doc2 = LoroDoc::new();
doc2.import_json_updates(json_updates).unwrap();
// Verify the commit messages were preserved in the JSON updates
let change1 = doc2.get_change(ID::new(doc1.peer_id(), 1)).unwrap();
let change2 = doc2.get_change(ID::new(doc1.peer_id(), 6)).unwrap();
assert_eq!(change1.message(), "first edit");
assert_eq!(change2.message(), "second edit");
// Verify the text content was also preserved
let text2 = doc2.get_text("text");
assert_eq!(text2.to_string(), "hello world");
}