From cfe8652415aceff518ecd9805c625372487d2676 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Mon, 26 Aug 2024 21:21:33 +0800 Subject: [PATCH] feat: basic gc mode impl --- Cargo.lock | 1 + .../src/container/richtext/tracker.rs | 12 +-- crates/loro-internal/src/encoding.rs | 26 ++++- .../src/encoding/fast_snapshot.rs | 15 +-- crates/loro-internal/src/encoding/gc.rs | 93 +++++++++++++++++- crates/loro-internal/src/loro.rs | 29 +++++- crates/loro-internal/src/oplog.rs | 10 +- .../loro-internal/src/oplog/change_store.rs | 57 ++++++----- crates/loro-internal/src/oplog/loro_dag.rs | 44 +++++++-- .../src/state/container_store.rs | 35 ++++++- .../src/state/container_store/inner_store.rs | 31 ++++++ crates/loro-internal/src/utils/kv_wrapper.rs | 11 ++- crates/loro/Cargo.toml | 1 + crates/loro/src/lib.rs | 9 ++ crates/loro/tests/loro_rust_test.rs | 94 ++++++++++++++++++- 15 files changed, 410 insertions(+), 58 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index baaef2d5..112a8d55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -991,6 +991,7 @@ dependencies = [ "generic-btree", "loro-delta 0.16.2", "loro-internal 0.16.2", + "rand", "serde_json", "tracing", ] diff --git a/crates/loro-internal/src/container/richtext/tracker.rs b/crates/loro-internal/src/container/richtext/tracker.rs index d23c493e..05abb6cf 100644 --- a/crates/loro-internal/src/container/richtext/tracker.rs +++ b/crates/loro-internal/src/container/richtext/tracker.rs @@ -82,12 +82,12 @@ impl Tracker { } pub(crate) fn insert(&mut self, mut op_id: IdFull, mut pos: usize, mut content: RichtextChunk) { - trace!( - "TrackerInsert op_id = {:#?}, pos = {:#?}, content = {:#?}", - op_id, - &pos, - &content - ); + // trace!( + // "TrackerInsert op_id = {:#?}, pos = {:#?}, content = {:#?}", + // op_id, + // &pos, + // &content + // ); // tracing::span!(tracing::Level::INFO, "TrackerInsert"); if let ControlFlow::Break(_) = self.skip_applied(op_id.id(), content.len(), |applied_counter_end| { diff --git a/crates/loro-internal/src/encoding.rs b/crates/loro-internal/src/encoding.rs index e5789139..d0ccff13 100644 --- a/crates/loro-internal/src/encoding.rs +++ b/crates/loro-internal/src/encoding.rs @@ -36,6 +36,7 @@ pub(crate) enum EncodeMode { Snapshot = 2, FastSnapshot = 3, FastUpdates = 4, + GcSnapshot = 5, } impl num_traits::FromPrimitive for EncodeMode { @@ -48,6 +49,7 @@ impl num_traits::FromPrimitive for EncodeMode { n if n == EncodeMode::Snapshot as i64 => Some(EncodeMode::Snapshot), n if n == EncodeMode::FastSnapshot as i64 => Some(EncodeMode::FastSnapshot), n if n == EncodeMode::FastUpdates as i64 => Some(EncodeMode::FastUpdates), + n if n == EncodeMode::GcSnapshot as i64 => Some(EncodeMode::GcSnapshot), _ => None, } } @@ -67,6 +69,7 @@ impl num_traits::ToPrimitive for EncodeMode { EncodeMode::Snapshot => EncodeMode::Snapshot as i64, EncodeMode::FastSnapshot => EncodeMode::FastSnapshot as i64, EncodeMode::FastUpdates => EncodeMode::FastUpdates as i64, + EncodeMode::GcSnapshot => EncodeMode::GcSnapshot as i64, }) } #[inline] @@ -176,7 +179,7 @@ pub(crate) fn decode_oplog( let ParsedHeaderAndBody { mode, body, .. } = parsed; match mode { EncodeMode::Rle | EncodeMode::Snapshot => encode_reordered::decode_updates(oplog, body), - EncodeMode::FastSnapshot | EncodeMode::FastUpdates => { + EncodeMode::FastSnapshot | EncodeMode::FastUpdates | EncodeMode::GcSnapshot => { fast_snapshot::decode_oplog(oplog, body) } EncodeMode::Auto => unreachable!(), @@ -200,7 +203,7 @@ impl ParsedHeaderAndBody<'_> { return Err(LoroError::DecodeChecksumMismatchError); } } - EncodeMode::FastSnapshot | EncodeMode::FastUpdates => { + EncodeMode::FastSnapshot | EncodeMode::FastUpdates | EncodeMode::GcSnapshot => { let expected = u32::from_le_bytes(self.checksum[12..16].try_into().unwrap()); if xxhash_rust::xxh32::xxh32(self.checksum_body, XXH_SEED) != expected { return Err(LoroError::DecodeChecksumMismatchError); @@ -301,6 +304,24 @@ pub(crate) fn export_fast_updates(doc: &LoroDoc, vv: &VersionVector) -> Vec ans } +pub(crate) fn export_gc_snapshot(doc: &LoroDoc, f: &Frontiers) -> Vec { + // HEADER + let mut ans = Vec::with_capacity(MIN_HEADER_SIZE); + ans.extend(MAGIC_BYTES); + let checksum = [0; 16]; + ans.extend(checksum); + ans.extend(EncodeMode::GcSnapshot.to_bytes()); + + // BODY + gc::export_gc_snapshot(doc, f, &mut ans); + + // CHECKSUM in HEADER + let checksum_body = &ans[20..]; + let checksum = xxhash_rust::xxh32::xxh32(checksum_body, XXH_SEED); + ans[16..20].copy_from_slice(&checksum.to_le_bytes()); + ans +} + pub(crate) fn decode_snapshot( doc: &LoroDoc, mode: EncodeMode, @@ -309,6 +330,7 @@ pub(crate) fn decode_snapshot( match mode { EncodeMode::Snapshot => encode_reordered::decode_snapshot(doc, body), EncodeMode::FastSnapshot => fast_snapshot::decode_snapshot(doc, body.to_vec().into()), + EncodeMode::GcSnapshot => gc::import_gc_snapshot(doc, body.to_vec().into()), _ => unreachable!(), } } diff --git a/crates/loro-internal/src/encoding/fast_snapshot.rs b/crates/loro-internal/src/encoding/fast_snapshot.rs index 6326e803..92bea78a 100644 --- a/crates/loro-internal/src/encoding/fast_snapshot.rs +++ b/crates/loro-internal/src/encoding/fast_snapshot.rs @@ -21,13 +21,13 @@ use loro_common::{LoroError, LoroResult}; use super::encode_reordered::import_changes_to_oplog; -struct Snapshot { - oplog_bytes: Bytes, - state_bytes: Bytes, - gc_bytes: Bytes, +pub(super) struct Snapshot { + pub oplog_bytes: Bytes, + pub state_bytes: Bytes, + pub gc_bytes: Bytes, } -fn _encode_snapshot(s: Snapshot, w: &mut W) { +pub(super) fn _encode_snapshot(s: Snapshot, w: &mut W) { w.write_all(&(s.oplog_bytes.len() as u32).to_le_bytes()) .unwrap(); w.write_all(&s.oplog_bytes).unwrap(); @@ -39,7 +39,7 @@ fn _encode_snapshot(s: Snapshot, w: &mut W) { w.write_all(&s.gc_bytes).unwrap(); } -fn _decode_snapshot_bytes(bytes: Bytes) -> LoroResult { +pub(super) fn _decode_snapshot_bytes(bytes: Bytes) -> LoroResult { let mut r = bytes.reader(); let oplog_bytes_len = read_u32_le(&mut r) as usize; let oplog_bytes = r.get_mut().copy_to_bytes(oplog_bytes_len); @@ -98,10 +98,11 @@ pub(crate) fn decode_snapshot(doc: &LoroDoc, bytes: Bytes) -> LoroResult<()> { } impl OpLog { - fn decode_change_store(&mut self, bytes: bytes::Bytes) -> LoroResult<()> { + pub(super) fn decode_change_store(&mut self, bytes: bytes::Bytes) -> LoroResult<()> { let v = self.change_store().import_all(bytes)?; self.next_lamport = v.next_lamport; self.latest_timestamp = v.max_timestamp; + // FIXME: handle start vv and start frontiers self.dag.set_version_by_fast_snapshot_import(v); Ok(()) } diff --git a/crates/loro-internal/src/encoding/gc.rs b/crates/loro-internal/src/encoding/gc.rs index 4d780653..a3846704 100644 --- a/crates/loro-internal/src/encoding/gc.rs +++ b/crates/loro-internal/src/encoding/gc.rs @@ -1,11 +1,96 @@ -use crate::{dag::DagUtils, version::Frontiers, LoroDoc}; +use bytes::Bytes; +use loro_common::LoroResult; +use tracing::debug; -pub(crate) fn export_gc_snapshot(doc: &LoroDoc, frontiers: &Frontiers) -> (Vec, Frontiers) { +use crate::{ + dag::DagUtils, + encoding::fast_snapshot::{Snapshot, _encode_snapshot}, + version::Frontiers, + LoroDoc, +}; + +use super::fast_snapshot::_decode_snapshot_bytes; + +#[tracing::instrument(skip_all)] +pub(crate) fn export_gc_snapshot( + doc: &LoroDoc, + start_from: &Frontiers, + w: &mut W, +) -> LoroResult { + assert!(!doc.is_detached()); let oplog = doc.oplog().lock().unwrap(); + let start_from = calc_actual_start(&oplog, start_from); + let start_vv = oplog.dag().frontiers_to_vv(&start_from).unwrap(); + debug!( + "start version vv={:?} frontiers={:?}", + &start_vv, &start_from, + ); + + let oplog_bytes = oplog.export_from_fast(&start_vv); + drop(oplog); + doc.checkout(&start_from)?; + let mut state = doc.app_state().lock().unwrap(); + let gc_state_bytes = state.store.encode(); + let old_kv = state.store.get_kv().clone(); + drop(state); + doc.checkout_to_latest(); + let mut state = doc.app_state().lock().unwrap(); + state.store.encode(); + let new_kv = state.store.get_kv().clone(); + new_kv.remove_same(&old_kv); + let state_bytes = new_kv.export(); + let snapshot = Snapshot { + oplog_bytes, + state_bytes, + gc_bytes: gc_state_bytes, + }; + + _encode_snapshot(snapshot, w); + Ok(start_from) +} + +/// The real start version should be the lca of the given one and the latest frontiers +fn calc_actual_start(oplog: &crate::OpLog, frontiers: &Frontiers) -> Frontiers { + let mut frontiers = frontiers; + let f; + if frontiers == oplog.frontiers() && !frontiers.is_empty() { + // This is not allowed. + // We need to at least export one op + f = Some(oplog.get_deps_of(frontiers[0]).unwrap()); + frontiers = f.as_ref().unwrap(); + } + // start is the real start frontiers let (start, _) = oplog .dag() - .find_common_ancestor(&frontiers, &oplog.frontiers()); + .find_common_ancestor(frontiers, oplog.frontiers()); - todo!() + let cur_f = oplog.frontiers(); + oplog.dag.find_common_ancestor(&start, cur_f).0 +} + +pub(crate) fn import_gc_snapshot(doc: &LoroDoc, bytes: Bytes) -> LoroResult<()> { + let mut oplog = doc.oplog().lock().unwrap(); + let mut state = doc.app_state().lock().unwrap(); + if !oplog.is_empty() || !state.is_empty() { + panic!() + } + + let Snapshot { + oplog_bytes, + state_bytes, + gc_bytes, + } = _decode_snapshot_bytes(bytes)?; + oplog.decode_change_store(oplog_bytes)?; + if !gc_bytes.is_empty() { + state + .store + .decode_gc(gc_bytes, state_bytes, oplog.dag().start_frontiers().clone())?; + } else { + state.store.decode(state_bytes)?; + } + // FIXME: we may need to extract the unknown containers here? + // Or we should lazy load it when the time comes? + state.init_with_states_and_version(oplog.frontiers().clone(), &oplog, vec![], false); + Ok(()) } diff --git a/crates/loro-internal/src/loro.rs b/crates/loro-internal/src/loro.rs index d0e37207..927b7690 100644 --- a/crates/loro-internal/src/loro.rs +++ b/crates/loro-internal/src/loro.rs @@ -28,8 +28,9 @@ use crate::{ dag::DagUtils, diff_calc::DiffCalculator, encoding::{ - decode_snapshot, export_fast_snapshot, export_fast_updates, export_snapshot, - json_schema::json::JsonSchema, parse_header_and_body, EncodeMode, ParsedHeaderAndBody, + decode_snapshot, export_fast_snapshot, export_fast_updates, export_gc_snapshot, + export_snapshot, json_schema::json::JsonSchema, parse_header_and_body, EncodeMode, + ParsedHeaderAndBody, }, event::{str_to_path, EventTriggerKind, Index, InternalDocDiff}, handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler}, @@ -40,7 +41,7 @@ use crate::{ state::DocState, txn::Transaction, undo::DiffBatch, - version::Frontiers, + version::{Frontiers, ImVersionVector}, HandlerTrait, InternalString, ListHandler, LoroError, MapHandler, VersionVector, }; @@ -491,6 +492,7 @@ impl LoroDoc { #[tracing::instrument(skip_all)] fn _import_with(&self, bytes: &[u8], origin: InternalString) -> Result<(), LoroError> { let parsed = parse_header_and_body(bytes)?; + info!("Importing with mode={:?}", &parsed.mode); match parsed.mode { EncodeMode::Rle => { if self.state.lock().unwrap().is_in_txn() { @@ -541,6 +543,17 @@ impl LoroDoc { origin, )?; } + EncodeMode::GcSnapshot => { + if self.can_reset_with_snapshot() { + tracing::info!("Init by fast snapshot {}", self.peer_id()); + decode_snapshot(self, parsed.mode, parsed.body)?; + } else { + self.update_oplog_and_apply_delta_to_state_if_needed( + |oplog| oplog.decode(parsed), + origin, + )?; + } + } EncodeMode::Auto => { unreachable!() } @@ -1452,12 +1465,20 @@ impl LoroDoc { let ans = match mode { ExportMode::Snapshot => export_fast_snapshot(self), ExportMode::Updates(vv) => export_fast_updates(self, vv), - ExportMode::GcSnapshot(_) => todo!(), + ExportMode::GcSnapshot(f) => export_gc_snapshot(self, f), }; self.renew_txn_if_auto_commit(); ans } + + pub fn trimmed_vv(&self) -> ImVersionVector { + self.oplog().lock().unwrap().trimmed_vv().clone() + } + + pub fn trimmed_frontiers(&self) -> Frontiers { + self.oplog().lock().unwrap().trimmed_frontiers().clone() + } } fn find_last_delete_op(oplog: &OpLog, id: ID, idx: ContainerIdx) -> Option { diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index 071c1a04..994f3f73 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -24,7 +24,7 @@ use crate::history_cache::ContainerHistoryCache; use crate::id::{Counter, PeerID, ID}; use crate::op::{FutureInnerContent, ListSlice, RawOpContent, RemoteOp, RichOp}; use crate::span::{HasCounterSpan, HasLamportSpan}; -use crate::version::{Frontiers, VersionVector}; +use crate::version::{Frontiers, ImVersionVector, VersionVector}; use crate::LoroError; use change_store::BlockOpRef; use loro_common::{IdLp, IdSpan}; @@ -616,6 +616,14 @@ impl OpLog { pub fn check_dag_correctness(&self) { self.dag.check_dag_correctness(); } + + pub fn trimmed_vv(&self) -> &ImVersionVector { + self.dag.start_vv() + } + + pub fn trimmed_frontiers(&self) -> &Frontiers { + self.dag.start_frontiers() + } } #[derive(Debug)] diff --git a/crates/loro-internal/src/oplog/change_store.rs b/crates/loro-internal/src/oplog/change_store.rs index 868cb280..b24a09c4 100644 --- a/crates/loro-internal/src/oplog/change_store.rs +++ b/crates/loro-internal/src/oplog/change_store.rs @@ -123,6 +123,7 @@ impl ChangeStore { self.external_kv.lock().unwrap().export_all() } + #[tracing::instrument(skip(self), level = "debug")] pub(super) fn export_from( &self, start_vv: &VersionVector, @@ -144,8 +145,7 @@ impl ChangeStore { assert_ne!(start, end); let ch = c.slice(start, end); - new_store.insert_change(ch, false); - for dep in c.deps.iter() { + for dep in ch.deps.iter() { if start_vv.includes_id(*dep) { match start_frontiers.entry(dep.peer) { std::collections::hash_map::Entry::Occupied(v) => { @@ -159,14 +159,19 @@ impl ChangeStore { } } } + new_store.insert_change(ch, false); } } - let start_frontiers = Frontiers::from_iter( - start_frontiers - .into_iter() - .map(|(peer, counter)| ID::new(peer, counter)), - ); + let start_frontiers = if latest_vv == start_vv { + latest_frontiers.clone() + } else { + Frontiers::from_iter( + start_frontiers + .into_iter() + .map(|(peer, counter)| ID::new(peer, counter)), + ) + }; new_store.encode_from(start_vv, &start_frontiers, latest_vv, latest_frontiers) } @@ -456,21 +461,28 @@ mod mut_external_kv { let mut max_lamport = None; let mut max_timestamp = 0; drop(kv_store); - for id in frontiers.iter() { - let c = self.get_change(*id).unwrap(); - debug_assert_ne!(c.atom_len(), 0); - let l = c.lamport_last(); - if let Some(x) = max_lamport { - if l > x { + trace!( + "frontiers = {:#?}\n start_frontiers={:#?}", + &frontiers, + &start_frontiers + ); + if frontiers != start_frontiers { + for id in frontiers.iter() { + let c = self.get_change(*id).unwrap(); + debug_assert_ne!(c.atom_len(), 0); + let l = c.lamport_last(); + if let Some(x) = max_lamport { + if l > x { + max_lamport = Some(l); + } + } else { max_lamport = Some(l); } - } else { - max_lamport = Some(l); - } - let t = c.timestamp; - if t > max_timestamp { - max_timestamp = t; + let t = c.timestamp; + if t > max_timestamp { + max_timestamp = t; + } } } @@ -516,8 +528,11 @@ mod mut_external_kv { let id_bytes = id.to_bytes(); let counter_start = external_vv.get(&id.peer).copied().unwrap_or(0); assert!( - counter_start >= block.counter_range.0 - && counter_start < block.counter_range.1 + counter_start < block.counter_range.1, + "Peer={} Block Counter Range={:?}, counter_start={}", + id.peer, + &block.counter_range, + counter_start ); if counter_start > block.counter_range.0 { assert!(store.get(&id_bytes).is_some()); diff --git a/crates/loro-internal/src/oplog/loro_dag.rs b/crates/loro-internal/src/oplog/loro_dag.rs index 9efdf479..7487f05f 100644 --- a/crates/loro-internal/src/oplog/loro_dag.rs +++ b/crates/loro-internal/src/oplog/loro_dag.rs @@ -12,6 +12,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::fmt::Display; use std::ops::Deref; use std::sync::{Arc, Mutex}; +use tracing::trace; use super::change_store::BatchDecodeInfo; use super::ChangeStore; @@ -30,6 +31,10 @@ pub struct AppDag { frontiers: Frontiers, /// The latest known version vectorG vv: VersionVector, + /// The latest known frontiers + start_frontiers: Frontiers, + /// The latest known version vectorG + start_vv: ImVersionVector, /// Ops included in the version vector but not parsed yet /// /// # Invariants @@ -94,6 +99,8 @@ impl AppDag { vv: VersionVector::default(), unparsed_vv: Mutex::new(VersionVector::default()), unhandled_dep_points: Mutex::new(BTreeSet::new()), + start_frontiers: Default::default(), + start_vv: Default::default(), } } @@ -105,6 +112,14 @@ impl AppDag { &self.vv } + pub fn start_vv(&self) -> &ImVersionVector { + &self.start_vv + } + + pub fn start_frontiers(&self) -> &Frontiers { + &self.start_frontiers + } + pub fn is_empty(&self) -> bool { self.vv.is_empty() } @@ -376,6 +391,7 @@ impl AppDag { return; } + trace!("Trying to get id={}", id); let Some(nodes) = self.change_store.get_dag_nodes_that_contains(id) else { panic!("unparsed vv don't match with change store. Id:{id} is not in change store") }; @@ -391,6 +407,8 @@ impl AppDag { vv: self.vv.clone(), unparsed_vv: Mutex::new(self.unparsed_vv.try_lock().unwrap().clone()), unhandled_dep_points: Mutex::new(self.unhandled_dep_points.try_lock().unwrap().clone()), + start_frontiers: self.start_frontiers.clone(), + start_vv: self.start_vv.clone(), } } @@ -403,6 +421,10 @@ impl AppDag { *self.unparsed_vv.try_lock().unwrap() = v.vv.clone(); self.vv = v.vv; self.frontiers = v.frontiers; + if let Some((vv, f)) = v.start_version { + self.start_frontiers = f; + self.start_vv = ImVersionVector::from_vv(&vv); + } } /// This method is slow and should only be used for debugging and testing. @@ -658,16 +680,22 @@ impl AppDag { } let mut ans_vv = ImVersionVector::default(); - for id in node.deps.iter() { - let node = self.get(*id).expect("deps should be in the dag"); - let dep_vv = self.ensure_vv_for(&node); - if ans_vv.is_empty() { - ans_vv = dep_vv; - } else { - ans_vv.extend_to_include_vv(dep_vv.iter()); + if node.deps == self.start_frontiers { + for (&p, &c) in self.start_vv.iter() { + ans_vv.insert(p, c); } + } else { + for id in node.deps.iter() { + let node = self.get(*id).expect("deps should be in the dag"); + let dep_vv = self.ensure_vv_for(&node); + if ans_vv.is_empty() { + ans_vv = dep_vv; + } else { + ans_vv.extend_to_include_vv(dep_vv.iter()); + } - ans_vv.insert(node.peer, node.ctr_end()); + ans_vv.insert(node.peer, node.ctr_end()); + } } node.vv.set(ans_vv.clone()).unwrap(); diff --git a/crates/loro-internal/src/state/container_store.rs b/crates/loro-internal/src/state/container_store.rs index e0f8d75d..7457081b 100644 --- a/crates/loro-internal/src/state/container_store.rs +++ b/crates/loro-internal/src/state/container_store.rs @@ -6,6 +6,9 @@ use crate::{ configure::Configure, container::idx::ContainerIdx, state::{FastStateSnapshot, RichtextState}, + utils::kv_wrapper::KvWrapper, + version::Frontiers, + VersionVector, }; use bytes::Bytes; use fxhash::FxHashMap; @@ -54,6 +57,7 @@ mod inner_store; pub(crate) struct ContainerStore { arena: SharedArena, store: InnerStore, + gc_store: Option>, conf: Configure, peer: Arc, } @@ -66,6 +70,11 @@ impl std::fmt::Debug for ContainerStore { } } +struct GcStore { + start_frontiers: Frontiers, + store: InnerStore, +} + macro_rules! ctx { ($self:expr) => { ContainerCreationContext { @@ -81,6 +90,7 @@ impl ContainerStore { store: InnerStore::new(arena.clone()), arena, conf, + gc_store: None, peer, } } @@ -108,10 +118,28 @@ impl ContainerStore { self.store.encode() } - pub fn decode(&mut self, bytes: Bytes) -> LoroResult<()> { + pub(crate) fn decode(&mut self, bytes: Bytes) -> LoroResult<()> { self.store.decode(bytes) } + pub(crate) fn decode_gc( + &mut self, + gc_bytes: Bytes, + state_bytes: Bytes, + start_frontiers: Frontiers, + ) -> LoroResult<()> { + assert!(self.gc_store.is_none()); + self.store.decode_twice(gc_bytes.clone(), state_bytes)?; + if !start_frontiers.is_empty() { + self.gc_store = Some(Box::new(GcStore { + start_frontiers, + store: InnerStore::new(self.arena.clone()), + })); + self.gc_store.as_mut().unwrap().store.decode(gc_bytes); + } + Ok(()) + } + pub fn iter_and_decode_all(&mut self) -> impl Iterator { self.store.iter_all_containers_mut().map(|(idx, v)| { v.get_state_mut( @@ -124,6 +152,10 @@ impl ContainerStore { }) } + pub fn get_kv(&self) -> &KvWrapper { + self.store.get_kv() + } + pub fn is_empty(&self) -> bool { self.store.is_empty() } @@ -179,6 +211,7 @@ impl ContainerStore { arena, conf: config, peer, + gc_store: None, } } diff --git a/crates/loro-internal/src/state/container_store/inner_store.rs b/crates/loro-internal/src/state/container_store/inner_store.rs index df39be27..dee60b8a 100644 --- a/crates/loro-internal/src/state/container_store/inner_store.rs +++ b/crates/loro-internal/src/state/container_store/inner_store.rs @@ -87,6 +87,10 @@ impl InnerStore { self.kv.export() } + pub(crate) fn get_kv(&self) -> &KvWrapper { + &self.kv + } + pub(crate) fn decode(&mut self, bytes: bytes::Bytes) -> Result<(), loro_common::LoroError> { assert!(self.len == 0); self.kv.import(bytes); @@ -109,6 +113,33 @@ impl InnerStore { Ok(()) } + pub(crate) fn decode_twice( + &mut self, + bytes_a: bytes::Bytes, + bytes_b: bytes::Bytes, + ) -> Result<(), loro_common::LoroError> { + assert!(self.len == 0); + self.kv.import(bytes_a); + self.kv.import(bytes_b); + self.kv.with_kv(|kv| { + let mut count = 0; + let iter = kv.scan(Bound::Unbounded, Bound::Unbounded); + for (k, v) in iter { + count += 1; + let cid = ContainerID::from_bytes(&k); + let parent = ContainerWrapper::decode_parent(&v); + let idx = self.arena.register_container(&cid); + let p = parent.as_ref().map(|p| self.arena.register_container(p)); + self.arena.set_parent(idx, p); + } + + self.len = count; + }); + + self.all_loaded = false; + Ok(()) + } + fn load_all(&mut self) { if self.all_loaded { return; diff --git a/crates/loro-internal/src/utils/kv_wrapper.rs b/crates/loro-internal/src/utils/kv_wrapper.rs index 4e120a65..17e2bd47 100644 --- a/crates/loro-internal/src/utils/kv_wrapper.rs +++ b/crates/loro-internal/src/utils/kv_wrapper.rs @@ -39,7 +39,6 @@ impl KvWrapper { pub fn import(&self, bytes: Bytes) { let mut kv = self.kv.lock().unwrap(); - assert!(kv.len() == 0); kv.import_all(bytes); } @@ -68,4 +67,14 @@ impl KvWrapper { pub(crate) fn contains_key(&self, key: &[u8]) -> bool { self.kv.lock().unwrap().contains_key(key) } + + pub(crate) fn remove_same(&self, old_kv: &KvWrapper) { + let other = old_kv.kv.lock().unwrap(); + let mut this = self.kv.lock().unwrap(); + for (k, v) in other.scan(Bound::Unbounded, Bound::Unbounded) { + if this.get(&k) == Some(v) { + this.remove(&k) + } + } + } } diff --git a/crates/loro/Cargo.toml b/crates/loro/Cargo.toml index b373ad42..ae13572e 100644 --- a/crates/loro/Cargo.toml +++ b/crates/loro/Cargo.toml @@ -26,6 +26,7 @@ serde_json = "1.0.87" anyhow = "1.0.83" ctor = "0.2" dev-utils = { path = "../dev-utils" } +rand = "0.8.5" [features] counter = ["loro-internal/counter"] diff --git a/crates/loro/src/lib.rs b/crates/loro/src/lib.rs index de809b4a..1c3039fb 100644 --- a/crates/loro/src/lib.rs +++ b/crates/loro/src/lib.rs @@ -14,6 +14,7 @@ use loro_internal::handler::HandlerTrait; use loro_internal::handler::ValueOrHandler; use loro_internal::json::JsonChange; use loro_internal::undo::{OnPop, OnPush}; +use loro_internal::version::ImVersionVector; use loro_internal::DocState; use loro_internal::LoroDoc as InnerLoroDoc; use loro_internal::OpLog; @@ -436,6 +437,14 @@ impl LoroDoc { self.doc.state_vv() } + /// Get the `VersionVector` of trimmed history + /// + /// The ops included by the trimmed history are not in the doc. + #[inline] + pub fn trimmed_vv(&self) -> ImVersionVector { + self.doc.trimmed_vv() + } + /// Get the total number of operations in the `OpLog` #[inline] pub fn len_ops(&self) -> usize { diff --git a/crates/loro/tests/loro_rust_test.rs b/crates/loro/tests/loro_rust_test.rs index 36f347e1..541a25f2 100644 --- a/crates/loro/tests/loro_rust_test.rs +++ b/crates/loro/tests/loro_rust_test.rs @@ -4,12 +4,13 @@ use std::{ }; use loro::{ - awareness::Awareness, loro_value, FrontiersNotIncluded, LoroDoc, LoroError, LoroList, LoroMap, - LoroText, ToJson, VersionVector, + awareness::Awareness, loro_value, Frontiers, FrontiersNotIncluded, LoroDoc, LoroError, + LoroList, LoroMap, LoroText, ToJson, VersionVector, }; use loro_internal::{handler::TextDelta, id::ID, vv, LoroResult}; +use rand::{Rng, SeedableRng}; use serde_json::json; -use tracing::trace_span; +use tracing::{trace, trace_span}; mod integration_test; @@ -956,3 +957,90 @@ fn new_update_encode_mode() { // Check equality after syncing back assert_eq!(doc.get_deep_value(), doc2.get_deep_value()); } + +fn apply_random_ops(doc: &LoroDoc, seed: u64, mut op_len: usize) { + let mut rng = rand::rngs::StdRng::seed_from_u64(seed); + while op_len > 0 { + match rng.gen_range(0..4) { + 0 => { + // Insert text + let text = doc.get_text("text"); + let pos = rng.gen_range(0..=text.len_unicode()); + let content = rng.gen_range('A'..='z').to_string(); + text.insert(pos, &content).unwrap(); + op_len -= 1; + } + 1 => { + // Delete text + let text = doc.get_text("text"); + if text.len_unicode() > 0 { + let start = rng.gen_range(0..text.len_unicode()); + text.delete(start, 1).unwrap(); + op_len -= 1; + } + } + 2 => { + // Insert into map + let map = doc.get_map("map"); + let key = format!("key{}", rng.gen::()); + let value = rng.gen::(); + map.insert(&key, value).unwrap(); + op_len -= 1; + } + 3 => { + // Push to list + let list = doc.get_list("list"); + let item = format!("item{}", rng.gen::()); + list.push(item).unwrap(); + op_len -= 1; + } + _ => unreachable!(), + } + } + + doc.commit(); +} + +#[test] +fn test_gc_sync() { + let doc = LoroDoc::new(); + apply_random_ops(&doc, 123, 11); + let bytes = doc.export(loro::ExportMode::GcSnapshot( + &ID::new(doc.peer_id(), 10).into(), + )); + + let new_doc = LoroDoc::new(); + new_doc.import(&bytes).unwrap(); + assert_eq!(doc.get_deep_value(), new_doc.get_deep_value()); + let trim_end = new_doc.trimmed_vv().get(&doc.peer_id()).copied().unwrap(); + assert_eq!(trim_end, 10); + + apply_random_ops(&new_doc, 1234, 5); + let updates = new_doc.export(loro::ExportMode::Updates(&doc.oplog_vv())); + doc.import(&updates).unwrap(); + assert_eq!(doc.get_deep_value(), new_doc.get_deep_value()); + + apply_random_ops(&doc, 11, 5); + let updates = doc.export(loro::ExportMode::Updates(&new_doc.oplog_vv())); + new_doc.import(&updates).unwrap(); + assert_eq!(doc.get_deep_value(), new_doc.get_deep_value()); +} + +#[test] +fn test_gc_empty() { + let doc = LoroDoc::new(); + apply_random_ops(&doc, 123, 11); + let bytes = doc.export(loro::ExportMode::GcSnapshot(&Frontiers::default())); + let new_doc = LoroDoc::new(); + new_doc.import(&bytes).unwrap(); + assert_eq!(doc.get_deep_value(), new_doc.get_deep_value()); + apply_random_ops(&new_doc, 0, 10); + doc.import(&new_doc.export_from(&Default::default())) + .unwrap(); + assert_eq!(doc.get_deep_value(), new_doc.get_deep_value()); + + let bytes = new_doc.export(loro::ExportMode::Snapshot); + let doc_c = LoroDoc::new(); + doc_c.import(&bytes).unwrap(); + assert_eq!(doc_c.get_deep_value(), new_doc.get_deep_value()); +}