From 01fccc5a7d0b7b8a122b73c34a1206471bee778a Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Mon, 9 Dec 2024 16:31:21 +0800 Subject: [PATCH] feat: return import status in import_batch method (#573) * feat: return import status in import_batch method * feat(wasm): add importUpdateBatch returned status support * chore: add changeset update * chore: update ffi return type --- .changeset/poor-fishes-fry.md | 5 +++ crates/loro-ffi/src/doc.rs | 5 ++- crates/loro-internal/src/loro.rs | 48 +++++++++++++++++++---- crates/loro-internal/src/version.rs | 11 ++++++ crates/loro-wasm/src/lib.rs | 9 ++--- crates/loro-wasm/tests/basic.test.ts | 56 +++++++++++++++++++++++++++ crates/loro/src/lib.rs | 2 +- crates/loro/tests/loro_rust_test.rs | 58 ++++++++++++++++++++++++++-- 8 files changed, 175 insertions(+), 19 deletions(-) create mode 100644 .changeset/poor-fishes-fry.md diff --git a/.changeset/poor-fishes-fry.md b/.changeset/poor-fishes-fry.md new file mode 100644 index 00000000..b6c28c83 --- /dev/null +++ b/.changeset/poor-fishes-fry.md @@ -0,0 +1,5 @@ +--- +"loro-crdt": minor +--- + +Return ImportStatus in the import_batch method diff --git a/crates/loro-ffi/src/doc.rs b/crates/loro-ffi/src/doc.rs index 5d875116..28488ac4 100644 --- a/crates/loro-ffi/src/doc.rs +++ b/crates/loro-ffi/src/doc.rs @@ -177,8 +177,9 @@ impl LoroDoc { /// /// The data can be in arbitrary order. The import result will be the same. #[inline] - pub fn import_batch(&self, bytes: &[Vec]) -> LoroResult<()> { - self.doc.import_batch(bytes) + pub fn import_batch(&self, bytes: &[Vec]) -> Result { + let status = self.doc.import_batch(bytes)?; + Ok(status.into()) } pub fn get_movable_list(&self, id: Arc) -> Arc { diff --git a/crates/loro-internal/src/loro.rs b/crates/loro-internal/src/loro.rs index dd8216ff..ddc17b74 100644 --- a/crates/loro-internal/src/loro.rs +++ b/crates/loro-internal/src/loro.rs @@ -9,7 +9,7 @@ use rle::HasLength; use std::{ borrow::Cow, cmp::Ordering, - collections::BinaryHeap, + collections::{hash_map::Entry, BinaryHeap}, ops::ControlFlow, sync::{ atomic::{ @@ -48,7 +48,7 @@ use crate::{ txn::Transaction, undo::DiffBatch, utils::subscription::{SubscriberSetWithQueue, Subscription}, - version::{shrink_frontiers, Frontiers, ImVersionVector}, + version::{shrink_frontiers, Frontiers, ImVersionVector, VersionRange}, ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroError, MapHandler, VersionVector, }; @@ -986,15 +986,17 @@ impl LoroDoc { // PERF: opt #[tracing::instrument(skip_all)] - pub fn import_batch(&self, bytes: &[Vec]) -> LoroResult<()> { + pub fn import_batch(&self, bytes: &[Vec]) -> LoroResult { if bytes.is_empty() { - return Ok(()); + return Ok(ImportStatus::default()); } if bytes.len() == 1 { - return self.import(&bytes[0]).map(|_| ()); + return self.import(&bytes[0]); } + let mut success = VersionRange::default(); + let mut pending = VersionRange::default(); let mut meta_arr = bytes .iter() .map(|b| Ok((LoroDoc::decode_import_blob_meta(b, false)?, b))) @@ -1012,8 +1014,31 @@ impl LoroDoc { let mut err = None; for (_meta, data) in meta_arr { match self.import(data) { - Ok(_s) => { - // TODO: merge + Ok(s) => { + for (peer, (start, end)) in s.success.iter() { + match success.0.entry(*peer) { + Entry::Occupied(mut e) => { + e.get_mut().1 = *end.max(&e.get().1); + } + Entry::Vacant(e) => { + e.insert((*start, *end)); + } + } + } + + if let Some(p) = s.pending.as_ref() { + for (&peer, &(start, end)) in p.iter() { + match pending.0.entry(peer) { + Entry::Occupied(mut e) => { + e.get_mut().0 = start.min(e.get().0); + e.get_mut().1 = end.min(e.get().1); + } + Entry::Vacant(e) => { + e.insert((start, end)); + } + } + } + } } Err(e) => { err = Some(e); @@ -1034,7 +1059,14 @@ impl LoroDoc { return Err(err); } - Ok(()) + Ok(ImportStatus { + success, + pending: if pending.is_empty() { + None + } else { + Some(pending) + }, + }) } /// Get shallow value of the document. diff --git a/crates/loro-internal/src/version.rs b/crates/loro-internal/src/version.rs index 291ac6d3..aa736887 100644 --- a/crates/loro-internal/src/version.rs +++ b/crates/loro-internal/src/version.rs @@ -30,6 +30,17 @@ pub struct VersionVector(FxHashMap); #[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct VersionRange(pub(crate) FxHashMap); +#[macro_export] +macro_rules! version_range { + ($($peer:expr => ($start:expr, $end:expr)),* $(,)?) => {{ + let mut map = ::fxhash::FxHashMap::default(); + $( + map.insert($peer, ($start, $end)); + )* + $crate::version::VersionRange::from_map(map) + }}; +} + impl VersionRange { pub fn new() -> Self { Self(Default::default()) diff --git a/crates/loro-wasm/src/lib.rs b/crates/loro-wasm/src/lib.rs index 50676420..f88282c3 100644 --- a/crates/loro-wasm/src/lib.rs +++ b/crates/loro-wasm/src/lib.rs @@ -1289,7 +1289,7 @@ impl LoroDoc { /// doc2.importUpdateBatch([snapshot, updates]); /// ``` #[wasm_bindgen(js_name = "importUpdateBatch")] - pub fn import_update_batch(&mut self, data: Array) -> JsResult<()> { + pub fn import_update_batch(&mut self, data: Array) -> JsResult { let data = data .iter() .map(|x| { @@ -1297,10 +1297,9 @@ impl LoroDoc { arr.to_vec() }) .collect::>(); - if data.is_empty() { - return Ok(()); - } - Ok(self.0.import_batch(&data)?) + + let status = self.0.import_batch(&data)?; + Ok(import_status_to_js_value(status).into()) } /// Get the shallow json format of the document state. diff --git a/crates/loro-wasm/tests/basic.test.ts b/crates/loro-wasm/tests/basic.test.ts index 62cca92d..62d83c12 100644 --- a/crates/loro-wasm/tests/basic.test.ts +++ b/crates/loro-wasm/tests/basic.test.ts @@ -751,3 +751,59 @@ describe("isDeleted", () => { expect(subB.isDeleted()).toBe(true); }) }) + +it("test import batch", () => { + const doc1 = new LoroDoc(); + doc1.setPeerId("1"); + doc1.getText("text").insert(0, "Hello world!"); + + const doc2 = new LoroDoc(); + doc2.setPeerId("2"); + doc2.getText("text").insert(0, "Hello world!"); + + const blob11 = doc1.export({ + mode: "updates-in-range", + spans: [{ id: { peer: "1", counter: 0 }, len: 5 }] + }); + const blob12 = doc1.export({ + mode: "updates-in-range", + spans: [{ id: { peer: "1", counter: 5 }, len: 2 }] + }); + const blob13 = doc1.export({ + mode: "updates-in-range", + spans: [{ id: { peer: "1", counter: 6 }, len: 6 }] + }); + + const blob21 = doc2.export({ + mode: "updates-in-range", + spans: [{ id: { peer: "2", counter: 0 }, len: 5 }] + }); + const blob22 = doc2.export({ + mode: "updates-in-range", + spans: [{ id: { peer: "2", counter: 5 }, len: 1 }] + }); + const blob23 = doc2.export({ + mode: "updates-in-range", + spans: [{ id: { peer: "2", counter: 6 }, len: 6 }] + }); + + const newDoc = new LoroDoc(); + const status = newDoc.importUpdateBatch([blob11, blob13, blob21, blob23]); + + expect(status.success).toEqual(new Map([ + ["1", { start: 0, end: 5 }], + ["2", { start: 0, end: 5 }] + ])); + expect(status.pending).toEqual(new Map([ + ["1", { start: 6, end: 12 }], + ["2", { start: 6, end: 12 }] + ])); + + const status2 = newDoc.importUpdateBatch([blob12, blob22]); + expect(status2.success).toEqual(new Map([ + ["1", { start: 5, end: 12 }], + ["2", { start: 5, end: 12 }] + ])); + expect(status2.pending).toBeNull(); + expect(newDoc.getText("text").toString()).toBe("Hello world!Hello world!"); +}) diff --git a/crates/loro/src/lib.rs b/crates/loro/src/lib.rs index e2210234..61ed4e5c 100644 --- a/crates/loro/src/lib.rs +++ b/crates/loro/src/lib.rs @@ -308,7 +308,7 @@ impl LoroDoc { /// /// The data can be in arbitrary order. The import result will be the same. #[inline] - pub fn import_batch(&self, bytes: &[Vec]) -> LoroResult<()> { + pub fn import_batch(&self, bytes: &[Vec]) -> LoroResult { self.doc.import_batch(bytes) } diff --git a/crates/loro/tests/loro_rust_test.rs b/crates/loro/tests/loro_rust_test.rs index aebf0f00..a93b3894 100644 --- a/crates/loro/tests/loro_rust_test.rs +++ b/crates/loro/tests/loro_rust_test.rs @@ -11,10 +11,12 @@ use std::{ use loro::{ awareness::Awareness, loro_value, CommitOptions, ContainerID, ContainerTrait, ContainerType, - ExportMode, Frontiers, FrontiersNotIncluded, LoroDoc, LoroError, LoroList, LoroMap, LoroText, - ToJson, + ExportMode, Frontiers, FrontiersNotIncluded, IdSpan, LoroDoc, LoroError, LoroList, LoroMap, + LoroText, ToJson, +}; +use loro_internal::{ + encoding::EncodedBlobMode, handler::TextDelta, id::ID, version_range, vv, LoroResult, }; -use loro_internal::{encoding::EncodedBlobMode, handler::TextDelta, id::ID, vv, LoroResult}; use rand::{Rng, SeedableRng}; use serde_json::json; use tracing::trace_span; @@ -2257,3 +2259,53 @@ fn change_count() { new_doc.import(&bytes.unwrap()).unwrap(); assert_eq!(new_doc.len_changes(), n); } + +#[test] +fn loro_import_batch_status() { + let doc_1 = LoroDoc::new(); + doc_1.set_peer_id(1).unwrap(); + doc_1.get_text("text").insert(0, "Hello world!").unwrap(); + + let doc_2 = LoroDoc::new(); + doc_2.set_peer_id(2).unwrap(); + doc_2.get_text("text").insert(0, "Hello world!").unwrap(); + + let blob11 = doc_1 + .export(ExportMode::updates_in_range(vec![IdSpan::new(1, 0, 5)])) + .unwrap(); + let blob12 = doc_1 + .export(ExportMode::updates_in_range(vec![IdSpan::new(1, 5, 7)])) + .unwrap(); + let blob13 = doc_1 + .export(ExportMode::updates_in_range(vec![IdSpan::new(1, 6, 12)])) + .unwrap(); + + let blob21 = doc_2 + .export(ExportMode::updates_in_range(vec![IdSpan::new(2, 0, 5)])) + .unwrap(); + let blob22 = doc_2 + .export(ExportMode::updates_in_range(vec![IdSpan::new(2, 5, 6)])) + .unwrap(); + let blob23 = doc_2 + .export(ExportMode::updates_in_range(vec![IdSpan::new(2, 6, 12)])) + .unwrap(); + + let new_doc = LoroDoc::new(); + let status = new_doc + .import_batch(&[blob11, blob13, blob21, blob23]) + .unwrap(); + + assert_eq!(status.success, version_range!(1 => (0, 5), 2 => (0, 5))); + assert_eq!( + status.pending, + Some(version_range!(1 => (6, 12), 2 => (6, 12))) + ); + + let status = new_doc.import_batch(&[blob12, blob22]).unwrap(); + assert_eq!(status.success, version_range!(1 => (5, 12), 2 => (5, 12))); + assert!(status.pending.is_none()); + assert_eq!( + new_doc.get_text("text").to_string(), + "Hello world!Hello world!" + ); +}