mirror of
https://github.com/loro-dev/loro.git
synced 2025-02-05 20:17:13 +00:00
feat: return import status in import_batch method (#573)
* feat: return import status in import_batch method * feat(wasm): add importUpdateBatch returned status support * chore: add changeset update * chore: update ffi return type
This commit is contained in:
parent
46f578bcd0
commit
01fccc5a7d
8 changed files with 175 additions and 19 deletions
5
.changeset/poor-fishes-fry.md
Normal file
5
.changeset/poor-fishes-fry.md
Normal file
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
"loro-crdt": minor
|
||||
---
|
||||
|
||||
Return ImportStatus in the import_batch method
|
|
@ -177,8 +177,9 @@ impl LoroDoc {
|
|||
///
|
||||
/// The data can be in arbitrary order. The import result will be the same.
|
||||
#[inline]
|
||||
pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<()> {
|
||||
self.doc.import_batch(bytes)
|
||||
pub fn import_batch(&self, bytes: &[Vec<u8>]) -> Result<ImportStatus, LoroError> {
|
||||
let status = self.doc.import_batch(bytes)?;
|
||||
Ok(status.into())
|
||||
}
|
||||
|
||||
pub fn get_movable_list(&self, id: Arc<dyn ContainerIdLike>) -> Arc<LoroMovableList> {
|
||||
|
|
|
@ -9,7 +9,7 @@ use rle::HasLength;
|
|||
use std::{
|
||||
borrow::Cow,
|
||||
cmp::Ordering,
|
||||
collections::BinaryHeap,
|
||||
collections::{hash_map::Entry, BinaryHeap},
|
||||
ops::ControlFlow,
|
||||
sync::{
|
||||
atomic::{
|
||||
|
@ -48,7 +48,7 @@ use crate::{
|
|||
txn::Transaction,
|
||||
undo::DiffBatch,
|
||||
utils::subscription::{SubscriberSetWithQueue, Subscription},
|
||||
version::{shrink_frontiers, Frontiers, ImVersionVector},
|
||||
version::{shrink_frontiers, Frontiers, ImVersionVector, VersionRange},
|
||||
ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroError, MapHandler,
|
||||
VersionVector,
|
||||
};
|
||||
|
@ -986,15 +986,17 @@ impl LoroDoc {
|
|||
|
||||
// PERF: opt
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<()> {
|
||||
pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<ImportStatus> {
|
||||
if bytes.is_empty() {
|
||||
return Ok(());
|
||||
return Ok(ImportStatus::default());
|
||||
}
|
||||
|
||||
if bytes.len() == 1 {
|
||||
return self.import(&bytes[0]).map(|_| ());
|
||||
return self.import(&bytes[0]);
|
||||
}
|
||||
|
||||
let mut success = VersionRange::default();
|
||||
let mut pending = VersionRange::default();
|
||||
let mut meta_arr = bytes
|
||||
.iter()
|
||||
.map(|b| Ok((LoroDoc::decode_import_blob_meta(b, false)?, b)))
|
||||
|
@ -1012,8 +1014,31 @@ impl LoroDoc {
|
|||
let mut err = None;
|
||||
for (_meta, data) in meta_arr {
|
||||
match self.import(data) {
|
||||
Ok(_s) => {
|
||||
// TODO: merge
|
||||
Ok(s) => {
|
||||
for (peer, (start, end)) in s.success.iter() {
|
||||
match success.0.entry(*peer) {
|
||||
Entry::Occupied(mut e) => {
|
||||
e.get_mut().1 = *end.max(&e.get().1);
|
||||
}
|
||||
Entry::Vacant(e) => {
|
||||
e.insert((*start, *end));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(p) = s.pending.as_ref() {
|
||||
for (&peer, &(start, end)) in p.iter() {
|
||||
match pending.0.entry(peer) {
|
||||
Entry::Occupied(mut e) => {
|
||||
e.get_mut().0 = start.min(e.get().0);
|
||||
e.get_mut().1 = end.min(e.get().1);
|
||||
}
|
||||
Entry::Vacant(e) => {
|
||||
e.insert((start, end));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
err = Some(e);
|
||||
|
@ -1034,7 +1059,14 @@ impl LoroDoc {
|
|||
return Err(err);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(ImportStatus {
|
||||
success,
|
||||
pending: if pending.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(pending)
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
/// Get shallow value of the document.
|
||||
|
|
|
@ -30,6 +30,17 @@ pub struct VersionVector(FxHashMap<PeerID, Counter>);
|
|||
#[derive(Debug, Clone, Default, PartialEq, Eq)]
|
||||
pub struct VersionRange(pub(crate) FxHashMap<PeerID, (Counter, Counter)>);
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! version_range {
|
||||
($($peer:expr => ($start:expr, $end:expr)),* $(,)?) => {{
|
||||
let mut map = ::fxhash::FxHashMap::default();
|
||||
$(
|
||||
map.insert($peer, ($start, $end));
|
||||
)*
|
||||
$crate::version::VersionRange::from_map(map)
|
||||
}};
|
||||
}
|
||||
|
||||
impl VersionRange {
|
||||
pub fn new() -> Self {
|
||||
Self(Default::default())
|
||||
|
|
|
@ -1289,7 +1289,7 @@ impl LoroDoc {
|
|||
/// doc2.importUpdateBatch([snapshot, updates]);
|
||||
/// ```
|
||||
#[wasm_bindgen(js_name = "importUpdateBatch")]
|
||||
pub fn import_update_batch(&mut self, data: Array) -> JsResult<()> {
|
||||
pub fn import_update_batch(&mut self, data: Array) -> JsResult<JsImportStatus> {
|
||||
let data = data
|
||||
.iter()
|
||||
.map(|x| {
|
||||
|
@ -1297,10 +1297,9 @@ impl LoroDoc {
|
|||
arr.to_vec()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
if data.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
Ok(self.0.import_batch(&data)?)
|
||||
|
||||
let status = self.0.import_batch(&data)?;
|
||||
Ok(import_status_to_js_value(status).into())
|
||||
}
|
||||
|
||||
/// Get the shallow json format of the document state.
|
||||
|
|
|
@ -751,3 +751,59 @@ describe("isDeleted", () => {
|
|||
expect(subB.isDeleted()).toBe(true);
|
||||
})
|
||||
})
|
||||
|
||||
it("test import batch", () => {
|
||||
const doc1 = new LoroDoc();
|
||||
doc1.setPeerId("1");
|
||||
doc1.getText("text").insert(0, "Hello world!");
|
||||
|
||||
const doc2 = new LoroDoc();
|
||||
doc2.setPeerId("2");
|
||||
doc2.getText("text").insert(0, "Hello world!");
|
||||
|
||||
const blob11 = doc1.export({
|
||||
mode: "updates-in-range",
|
||||
spans: [{ id: { peer: "1", counter: 0 }, len: 5 }]
|
||||
});
|
||||
const blob12 = doc1.export({
|
||||
mode: "updates-in-range",
|
||||
spans: [{ id: { peer: "1", counter: 5 }, len: 2 }]
|
||||
});
|
||||
const blob13 = doc1.export({
|
||||
mode: "updates-in-range",
|
||||
spans: [{ id: { peer: "1", counter: 6 }, len: 6 }]
|
||||
});
|
||||
|
||||
const blob21 = doc2.export({
|
||||
mode: "updates-in-range",
|
||||
spans: [{ id: { peer: "2", counter: 0 }, len: 5 }]
|
||||
});
|
||||
const blob22 = doc2.export({
|
||||
mode: "updates-in-range",
|
||||
spans: [{ id: { peer: "2", counter: 5 }, len: 1 }]
|
||||
});
|
||||
const blob23 = doc2.export({
|
||||
mode: "updates-in-range",
|
||||
spans: [{ id: { peer: "2", counter: 6 }, len: 6 }]
|
||||
});
|
||||
|
||||
const newDoc = new LoroDoc();
|
||||
const status = newDoc.importUpdateBatch([blob11, blob13, blob21, blob23]);
|
||||
|
||||
expect(status.success).toEqual(new Map([
|
||||
["1", { start: 0, end: 5 }],
|
||||
["2", { start: 0, end: 5 }]
|
||||
]));
|
||||
expect(status.pending).toEqual(new Map([
|
||||
["1", { start: 6, end: 12 }],
|
||||
["2", { start: 6, end: 12 }]
|
||||
]));
|
||||
|
||||
const status2 = newDoc.importUpdateBatch([blob12, blob22]);
|
||||
expect(status2.success).toEqual(new Map([
|
||||
["1", { start: 5, end: 12 }],
|
||||
["2", { start: 5, end: 12 }]
|
||||
]));
|
||||
expect(status2.pending).toBeNull();
|
||||
expect(newDoc.getText("text").toString()).toBe("Hello world!Hello world!");
|
||||
})
|
||||
|
|
|
@ -308,7 +308,7 @@ impl LoroDoc {
|
|||
///
|
||||
/// The data can be in arbitrary order. The import result will be the same.
|
||||
#[inline]
|
||||
pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<()> {
|
||||
pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<ImportStatus> {
|
||||
self.doc.import_batch(bytes)
|
||||
}
|
||||
|
||||
|
|
|
@ -11,10 +11,12 @@ use std::{
|
|||
|
||||
use loro::{
|
||||
awareness::Awareness, loro_value, CommitOptions, ContainerID, ContainerTrait, ContainerType,
|
||||
ExportMode, Frontiers, FrontiersNotIncluded, LoroDoc, LoroError, LoroList, LoroMap, LoroText,
|
||||
ToJson,
|
||||
ExportMode, Frontiers, FrontiersNotIncluded, IdSpan, LoroDoc, LoroError, LoroList, LoroMap,
|
||||
LoroText, ToJson,
|
||||
};
|
||||
use loro_internal::{
|
||||
encoding::EncodedBlobMode, handler::TextDelta, id::ID, version_range, vv, LoroResult,
|
||||
};
|
||||
use loro_internal::{encoding::EncodedBlobMode, handler::TextDelta, id::ID, vv, LoroResult};
|
||||
use rand::{Rng, SeedableRng};
|
||||
use serde_json::json;
|
||||
use tracing::trace_span;
|
||||
|
@ -2257,3 +2259,53 @@ fn change_count() {
|
|||
new_doc.import(&bytes.unwrap()).unwrap();
|
||||
assert_eq!(new_doc.len_changes(), n);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn loro_import_batch_status() {
|
||||
let doc_1 = LoroDoc::new();
|
||||
doc_1.set_peer_id(1).unwrap();
|
||||
doc_1.get_text("text").insert(0, "Hello world!").unwrap();
|
||||
|
||||
let doc_2 = LoroDoc::new();
|
||||
doc_2.set_peer_id(2).unwrap();
|
||||
doc_2.get_text("text").insert(0, "Hello world!").unwrap();
|
||||
|
||||
let blob11 = doc_1
|
||||
.export(ExportMode::updates_in_range(vec![IdSpan::new(1, 0, 5)]))
|
||||
.unwrap();
|
||||
let blob12 = doc_1
|
||||
.export(ExportMode::updates_in_range(vec![IdSpan::new(1, 5, 7)]))
|
||||
.unwrap();
|
||||
let blob13 = doc_1
|
||||
.export(ExportMode::updates_in_range(vec![IdSpan::new(1, 6, 12)]))
|
||||
.unwrap();
|
||||
|
||||
let blob21 = doc_2
|
||||
.export(ExportMode::updates_in_range(vec![IdSpan::new(2, 0, 5)]))
|
||||
.unwrap();
|
||||
let blob22 = doc_2
|
||||
.export(ExportMode::updates_in_range(vec![IdSpan::new(2, 5, 6)]))
|
||||
.unwrap();
|
||||
let blob23 = doc_2
|
||||
.export(ExportMode::updates_in_range(vec![IdSpan::new(2, 6, 12)]))
|
||||
.unwrap();
|
||||
|
||||
let new_doc = LoroDoc::new();
|
||||
let status = new_doc
|
||||
.import_batch(&[blob11, blob13, blob21, blob23])
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(status.success, version_range!(1 => (0, 5), 2 => (0, 5)));
|
||||
assert_eq!(
|
||||
status.pending,
|
||||
Some(version_range!(1 => (6, 12), 2 => (6, 12)))
|
||||
);
|
||||
|
||||
let status = new_doc.import_batch(&[blob12, blob22]).unwrap();
|
||||
assert_eq!(status.success, version_range!(1 => (5, 12), 2 => (5, 12)));
|
||||
assert!(status.pending.is_none());
|
||||
assert_eq!(
|
||||
new_doc.get_text("text").to_string(),
|
||||
"Hello world!Hello world!"
|
||||
);
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue