mirror of
https://github.com/loro-dev/loro.git
synced 2025-02-05 20:17:13 +00:00
feat: basic gc mode impl
This commit is contained in:
parent
82d035442c
commit
cfe8652415
15 changed files with 410 additions and 58 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -991,6 +991,7 @@ dependencies = [
|
|||
"generic-btree",
|
||||
"loro-delta 0.16.2",
|
||||
"loro-internal 0.16.2",
|
||||
"rand",
|
||||
"serde_json",
|
||||
"tracing",
|
||||
]
|
||||
|
|
|
@ -82,12 +82,12 @@ impl Tracker {
|
|||
}
|
||||
|
||||
pub(crate) fn insert(&mut self, mut op_id: IdFull, mut pos: usize, mut content: RichtextChunk) {
|
||||
trace!(
|
||||
"TrackerInsert op_id = {:#?}, pos = {:#?}, content = {:#?}",
|
||||
op_id,
|
||||
&pos,
|
||||
&content
|
||||
);
|
||||
// trace!(
|
||||
// "TrackerInsert op_id = {:#?}, pos = {:#?}, content = {:#?}",
|
||||
// op_id,
|
||||
// &pos,
|
||||
// &content
|
||||
// );
|
||||
// tracing::span!(tracing::Level::INFO, "TrackerInsert");
|
||||
if let ControlFlow::Break(_) =
|
||||
self.skip_applied(op_id.id(), content.len(), |applied_counter_end| {
|
||||
|
|
|
@ -36,6 +36,7 @@ pub(crate) enum EncodeMode {
|
|||
Snapshot = 2,
|
||||
FastSnapshot = 3,
|
||||
FastUpdates = 4,
|
||||
GcSnapshot = 5,
|
||||
}
|
||||
|
||||
impl num_traits::FromPrimitive for EncodeMode {
|
||||
|
@ -48,6 +49,7 @@ impl num_traits::FromPrimitive for EncodeMode {
|
|||
n if n == EncodeMode::Snapshot as i64 => Some(EncodeMode::Snapshot),
|
||||
n if n == EncodeMode::FastSnapshot as i64 => Some(EncodeMode::FastSnapshot),
|
||||
n if n == EncodeMode::FastUpdates as i64 => Some(EncodeMode::FastUpdates),
|
||||
n if n == EncodeMode::GcSnapshot as i64 => Some(EncodeMode::GcSnapshot),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
@ -67,6 +69,7 @@ impl num_traits::ToPrimitive for EncodeMode {
|
|||
EncodeMode::Snapshot => EncodeMode::Snapshot as i64,
|
||||
EncodeMode::FastSnapshot => EncodeMode::FastSnapshot as i64,
|
||||
EncodeMode::FastUpdates => EncodeMode::FastUpdates as i64,
|
||||
EncodeMode::GcSnapshot => EncodeMode::GcSnapshot as i64,
|
||||
})
|
||||
}
|
||||
#[inline]
|
||||
|
@ -176,7 +179,7 @@ pub(crate) fn decode_oplog(
|
|||
let ParsedHeaderAndBody { mode, body, .. } = parsed;
|
||||
match mode {
|
||||
EncodeMode::Rle | EncodeMode::Snapshot => encode_reordered::decode_updates(oplog, body),
|
||||
EncodeMode::FastSnapshot | EncodeMode::FastUpdates => {
|
||||
EncodeMode::FastSnapshot | EncodeMode::FastUpdates | EncodeMode::GcSnapshot => {
|
||||
fast_snapshot::decode_oplog(oplog, body)
|
||||
}
|
||||
EncodeMode::Auto => unreachable!(),
|
||||
|
@ -200,7 +203,7 @@ impl ParsedHeaderAndBody<'_> {
|
|||
return Err(LoroError::DecodeChecksumMismatchError);
|
||||
}
|
||||
}
|
||||
EncodeMode::FastSnapshot | EncodeMode::FastUpdates => {
|
||||
EncodeMode::FastSnapshot | EncodeMode::FastUpdates | EncodeMode::GcSnapshot => {
|
||||
let expected = u32::from_le_bytes(self.checksum[12..16].try_into().unwrap());
|
||||
if xxhash_rust::xxh32::xxh32(self.checksum_body, XXH_SEED) != expected {
|
||||
return Err(LoroError::DecodeChecksumMismatchError);
|
||||
|
@ -301,6 +304,24 @@ pub(crate) fn export_fast_updates(doc: &LoroDoc, vv: &VersionVector) -> Vec<u8>
|
|||
ans
|
||||
}
|
||||
|
||||
pub(crate) fn export_gc_snapshot(doc: &LoroDoc, f: &Frontiers) -> Vec<u8> {
|
||||
// HEADER
|
||||
let mut ans = Vec::with_capacity(MIN_HEADER_SIZE);
|
||||
ans.extend(MAGIC_BYTES);
|
||||
let checksum = [0; 16];
|
||||
ans.extend(checksum);
|
||||
ans.extend(EncodeMode::GcSnapshot.to_bytes());
|
||||
|
||||
// BODY
|
||||
gc::export_gc_snapshot(doc, f, &mut ans);
|
||||
|
||||
// CHECKSUM in HEADER
|
||||
let checksum_body = &ans[20..];
|
||||
let checksum = xxhash_rust::xxh32::xxh32(checksum_body, XXH_SEED);
|
||||
ans[16..20].copy_from_slice(&checksum.to_le_bytes());
|
||||
ans
|
||||
}
|
||||
|
||||
pub(crate) fn decode_snapshot(
|
||||
doc: &LoroDoc,
|
||||
mode: EncodeMode,
|
||||
|
@ -309,6 +330,7 @@ pub(crate) fn decode_snapshot(
|
|||
match mode {
|
||||
EncodeMode::Snapshot => encode_reordered::decode_snapshot(doc, body),
|
||||
EncodeMode::FastSnapshot => fast_snapshot::decode_snapshot(doc, body.to_vec().into()),
|
||||
EncodeMode::GcSnapshot => gc::import_gc_snapshot(doc, body.to_vec().into()),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,13 +21,13 @@ use loro_common::{LoroError, LoroResult};
|
|||
|
||||
use super::encode_reordered::import_changes_to_oplog;
|
||||
|
||||
struct Snapshot {
|
||||
oplog_bytes: Bytes,
|
||||
state_bytes: Bytes,
|
||||
gc_bytes: Bytes,
|
||||
pub(super) struct Snapshot {
|
||||
pub oplog_bytes: Bytes,
|
||||
pub state_bytes: Bytes,
|
||||
pub gc_bytes: Bytes,
|
||||
}
|
||||
|
||||
fn _encode_snapshot<W: Write>(s: Snapshot, w: &mut W) {
|
||||
pub(super) fn _encode_snapshot<W: Write>(s: Snapshot, w: &mut W) {
|
||||
w.write_all(&(s.oplog_bytes.len() as u32).to_le_bytes())
|
||||
.unwrap();
|
||||
w.write_all(&s.oplog_bytes).unwrap();
|
||||
|
@ -39,7 +39,7 @@ fn _encode_snapshot<W: Write>(s: Snapshot, w: &mut W) {
|
|||
w.write_all(&s.gc_bytes).unwrap();
|
||||
}
|
||||
|
||||
fn _decode_snapshot_bytes(bytes: Bytes) -> LoroResult<Snapshot> {
|
||||
pub(super) fn _decode_snapshot_bytes(bytes: Bytes) -> LoroResult<Snapshot> {
|
||||
let mut r = bytes.reader();
|
||||
let oplog_bytes_len = read_u32_le(&mut r) as usize;
|
||||
let oplog_bytes = r.get_mut().copy_to_bytes(oplog_bytes_len);
|
||||
|
@ -98,10 +98,11 @@ pub(crate) fn decode_snapshot(doc: &LoroDoc, bytes: Bytes) -> LoroResult<()> {
|
|||
}
|
||||
|
||||
impl OpLog {
|
||||
fn decode_change_store(&mut self, bytes: bytes::Bytes) -> LoroResult<()> {
|
||||
pub(super) fn decode_change_store(&mut self, bytes: bytes::Bytes) -> LoroResult<()> {
|
||||
let v = self.change_store().import_all(bytes)?;
|
||||
self.next_lamport = v.next_lamport;
|
||||
self.latest_timestamp = v.max_timestamp;
|
||||
// FIXME: handle start vv and start frontiers
|
||||
self.dag.set_version_by_fast_snapshot_import(v);
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -1,11 +1,96 @@
|
|||
use crate::{dag::DagUtils, version::Frontiers, LoroDoc};
|
||||
use bytes::Bytes;
|
||||
use loro_common::LoroResult;
|
||||
use tracing::debug;
|
||||
|
||||
pub(crate) fn export_gc_snapshot(doc: &LoroDoc, frontiers: &Frontiers) -> (Vec<u8>, Frontiers) {
|
||||
use crate::{
|
||||
dag::DagUtils,
|
||||
encoding::fast_snapshot::{Snapshot, _encode_snapshot},
|
||||
version::Frontiers,
|
||||
LoroDoc,
|
||||
};
|
||||
|
||||
use super::fast_snapshot::_decode_snapshot_bytes;
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(crate) fn export_gc_snapshot<W: std::io::Write>(
|
||||
doc: &LoroDoc,
|
||||
start_from: &Frontiers,
|
||||
w: &mut W,
|
||||
) -> LoroResult<Frontiers> {
|
||||
assert!(!doc.is_detached());
|
||||
let oplog = doc.oplog().lock().unwrap();
|
||||
let start_from = calc_actual_start(&oplog, start_from);
|
||||
let start_vv = oplog.dag().frontiers_to_vv(&start_from).unwrap();
|
||||
debug!(
|
||||
"start version vv={:?} frontiers={:?}",
|
||||
&start_vv, &start_from,
|
||||
);
|
||||
|
||||
let oplog_bytes = oplog.export_from_fast(&start_vv);
|
||||
drop(oplog);
|
||||
doc.checkout(&start_from)?;
|
||||
let mut state = doc.app_state().lock().unwrap();
|
||||
let gc_state_bytes = state.store.encode();
|
||||
let old_kv = state.store.get_kv().clone();
|
||||
drop(state);
|
||||
doc.checkout_to_latest();
|
||||
let mut state = doc.app_state().lock().unwrap();
|
||||
state.store.encode();
|
||||
let new_kv = state.store.get_kv().clone();
|
||||
new_kv.remove_same(&old_kv);
|
||||
let state_bytes = new_kv.export();
|
||||
let snapshot = Snapshot {
|
||||
oplog_bytes,
|
||||
state_bytes,
|
||||
gc_bytes: gc_state_bytes,
|
||||
};
|
||||
|
||||
_encode_snapshot(snapshot, w);
|
||||
Ok(start_from)
|
||||
}
|
||||
|
||||
/// The real start version should be the lca of the given one and the latest frontiers
|
||||
fn calc_actual_start(oplog: &crate::OpLog, frontiers: &Frontiers) -> Frontiers {
|
||||
let mut frontiers = frontiers;
|
||||
let f;
|
||||
if frontiers == oplog.frontiers() && !frontiers.is_empty() {
|
||||
// This is not allowed.
|
||||
// We need to at least export one op
|
||||
f = Some(oplog.get_deps_of(frontiers[0]).unwrap());
|
||||
frontiers = f.as_ref().unwrap();
|
||||
}
|
||||
|
||||
// start is the real start frontiers
|
||||
let (start, _) = oplog
|
||||
.dag()
|
||||
.find_common_ancestor(&frontiers, &oplog.frontiers());
|
||||
.find_common_ancestor(frontiers, oplog.frontiers());
|
||||
|
||||
todo!()
|
||||
let cur_f = oplog.frontiers();
|
||||
oplog.dag.find_common_ancestor(&start, cur_f).0
|
||||
}
|
||||
|
||||
pub(crate) fn import_gc_snapshot(doc: &LoroDoc, bytes: Bytes) -> LoroResult<()> {
|
||||
let mut oplog = doc.oplog().lock().unwrap();
|
||||
let mut state = doc.app_state().lock().unwrap();
|
||||
if !oplog.is_empty() || !state.is_empty() {
|
||||
panic!()
|
||||
}
|
||||
|
||||
let Snapshot {
|
||||
oplog_bytes,
|
||||
state_bytes,
|
||||
gc_bytes,
|
||||
} = _decode_snapshot_bytes(bytes)?;
|
||||
oplog.decode_change_store(oplog_bytes)?;
|
||||
if !gc_bytes.is_empty() {
|
||||
state
|
||||
.store
|
||||
.decode_gc(gc_bytes, state_bytes, oplog.dag().start_frontiers().clone())?;
|
||||
} else {
|
||||
state.store.decode(state_bytes)?;
|
||||
}
|
||||
// FIXME: we may need to extract the unknown containers here?
|
||||
// Or we should lazy load it when the time comes?
|
||||
state.init_with_states_and_version(oplog.frontiers().clone(), &oplog, vec![], false);
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -28,8 +28,9 @@ use crate::{
|
|||
dag::DagUtils,
|
||||
diff_calc::DiffCalculator,
|
||||
encoding::{
|
||||
decode_snapshot, export_fast_snapshot, export_fast_updates, export_snapshot,
|
||||
json_schema::json::JsonSchema, parse_header_and_body, EncodeMode, ParsedHeaderAndBody,
|
||||
decode_snapshot, export_fast_snapshot, export_fast_updates, export_gc_snapshot,
|
||||
export_snapshot, json_schema::json::JsonSchema, parse_header_and_body, EncodeMode,
|
||||
ParsedHeaderAndBody,
|
||||
},
|
||||
event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
|
||||
handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
|
||||
|
@ -40,7 +41,7 @@ use crate::{
|
|||
state::DocState,
|
||||
txn::Transaction,
|
||||
undo::DiffBatch,
|
||||
version::Frontiers,
|
||||
version::{Frontiers, ImVersionVector},
|
||||
HandlerTrait, InternalString, ListHandler, LoroError, MapHandler, VersionVector,
|
||||
};
|
||||
|
||||
|
@ -491,6 +492,7 @@ impl LoroDoc {
|
|||
#[tracing::instrument(skip_all)]
|
||||
fn _import_with(&self, bytes: &[u8], origin: InternalString) -> Result<(), LoroError> {
|
||||
let parsed = parse_header_and_body(bytes)?;
|
||||
info!("Importing with mode={:?}", &parsed.mode);
|
||||
match parsed.mode {
|
||||
EncodeMode::Rle => {
|
||||
if self.state.lock().unwrap().is_in_txn() {
|
||||
|
@ -541,6 +543,17 @@ impl LoroDoc {
|
|||
origin,
|
||||
)?;
|
||||
}
|
||||
EncodeMode::GcSnapshot => {
|
||||
if self.can_reset_with_snapshot() {
|
||||
tracing::info!("Init by fast snapshot {}", self.peer_id());
|
||||
decode_snapshot(self, parsed.mode, parsed.body)?;
|
||||
} else {
|
||||
self.update_oplog_and_apply_delta_to_state_if_needed(
|
||||
|oplog| oplog.decode(parsed),
|
||||
origin,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
EncodeMode::Auto => {
|
||||
unreachable!()
|
||||
}
|
||||
|
@ -1452,12 +1465,20 @@ impl LoroDoc {
|
|||
let ans = match mode {
|
||||
ExportMode::Snapshot => export_fast_snapshot(self),
|
||||
ExportMode::Updates(vv) => export_fast_updates(self, vv),
|
||||
ExportMode::GcSnapshot(_) => todo!(),
|
||||
ExportMode::GcSnapshot(f) => export_gc_snapshot(self, f),
|
||||
};
|
||||
|
||||
self.renew_txn_if_auto_commit();
|
||||
ans
|
||||
}
|
||||
|
||||
pub fn trimmed_vv(&self) -> ImVersionVector {
|
||||
self.oplog().lock().unwrap().trimmed_vv().clone()
|
||||
}
|
||||
|
||||
pub fn trimmed_frontiers(&self) -> Frontiers {
|
||||
self.oplog().lock().unwrap().trimmed_frontiers().clone()
|
||||
}
|
||||
}
|
||||
|
||||
fn find_last_delete_op(oplog: &OpLog, id: ID, idx: ContainerIdx) -> Option<ID> {
|
||||
|
|
|
@ -24,7 +24,7 @@ use crate::history_cache::ContainerHistoryCache;
|
|||
use crate::id::{Counter, PeerID, ID};
|
||||
use crate::op::{FutureInnerContent, ListSlice, RawOpContent, RemoteOp, RichOp};
|
||||
use crate::span::{HasCounterSpan, HasLamportSpan};
|
||||
use crate::version::{Frontiers, VersionVector};
|
||||
use crate::version::{Frontiers, ImVersionVector, VersionVector};
|
||||
use crate::LoroError;
|
||||
use change_store::BlockOpRef;
|
||||
use loro_common::{IdLp, IdSpan};
|
||||
|
@ -616,6 +616,14 @@ impl OpLog {
|
|||
pub fn check_dag_correctness(&self) {
|
||||
self.dag.check_dag_correctness();
|
||||
}
|
||||
|
||||
pub fn trimmed_vv(&self) -> &ImVersionVector {
|
||||
self.dag.start_vv()
|
||||
}
|
||||
|
||||
pub fn trimmed_frontiers(&self) -> &Frontiers {
|
||||
self.dag.start_frontiers()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
|
|
@ -123,6 +123,7 @@ impl ChangeStore {
|
|||
self.external_kv.lock().unwrap().export_all()
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub(super) fn export_from(
|
||||
&self,
|
||||
start_vv: &VersionVector,
|
||||
|
@ -144,8 +145,7 @@ impl ChangeStore {
|
|||
|
||||
assert_ne!(start, end);
|
||||
let ch = c.slice(start, end);
|
||||
new_store.insert_change(ch, false);
|
||||
for dep in c.deps.iter() {
|
||||
for dep in ch.deps.iter() {
|
||||
if start_vv.includes_id(*dep) {
|
||||
match start_frontiers.entry(dep.peer) {
|
||||
std::collections::hash_map::Entry::Occupied(v) => {
|
||||
|
@ -159,14 +159,19 @@ impl ChangeStore {
|
|||
}
|
||||
}
|
||||
}
|
||||
new_store.insert_change(ch, false);
|
||||
}
|
||||
}
|
||||
|
||||
let start_frontiers = Frontiers::from_iter(
|
||||
start_frontiers
|
||||
.into_iter()
|
||||
.map(|(peer, counter)| ID::new(peer, counter)),
|
||||
);
|
||||
let start_frontiers = if latest_vv == start_vv {
|
||||
latest_frontiers.clone()
|
||||
} else {
|
||||
Frontiers::from_iter(
|
||||
start_frontiers
|
||||
.into_iter()
|
||||
.map(|(peer, counter)| ID::new(peer, counter)),
|
||||
)
|
||||
};
|
||||
|
||||
new_store.encode_from(start_vv, &start_frontiers, latest_vv, latest_frontiers)
|
||||
}
|
||||
|
@ -456,21 +461,28 @@ mod mut_external_kv {
|
|||
let mut max_lamport = None;
|
||||
let mut max_timestamp = 0;
|
||||
drop(kv_store);
|
||||
for id in frontiers.iter() {
|
||||
let c = self.get_change(*id).unwrap();
|
||||
debug_assert_ne!(c.atom_len(), 0);
|
||||
let l = c.lamport_last();
|
||||
if let Some(x) = max_lamport {
|
||||
if l > x {
|
||||
trace!(
|
||||
"frontiers = {:#?}\n start_frontiers={:#?}",
|
||||
&frontiers,
|
||||
&start_frontiers
|
||||
);
|
||||
if frontiers != start_frontiers {
|
||||
for id in frontiers.iter() {
|
||||
let c = self.get_change(*id).unwrap();
|
||||
debug_assert_ne!(c.atom_len(), 0);
|
||||
let l = c.lamport_last();
|
||||
if let Some(x) = max_lamport {
|
||||
if l > x {
|
||||
max_lamport = Some(l);
|
||||
}
|
||||
} else {
|
||||
max_lamport = Some(l);
|
||||
}
|
||||
} else {
|
||||
max_lamport = Some(l);
|
||||
}
|
||||
|
||||
let t = c.timestamp;
|
||||
if t > max_timestamp {
|
||||
max_timestamp = t;
|
||||
let t = c.timestamp;
|
||||
if t > max_timestamp {
|
||||
max_timestamp = t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -516,8 +528,11 @@ mod mut_external_kv {
|
|||
let id_bytes = id.to_bytes();
|
||||
let counter_start = external_vv.get(&id.peer).copied().unwrap_or(0);
|
||||
assert!(
|
||||
counter_start >= block.counter_range.0
|
||||
&& counter_start < block.counter_range.1
|
||||
counter_start < block.counter_range.1,
|
||||
"Peer={} Block Counter Range={:?}, counter_start={}",
|
||||
id.peer,
|
||||
&block.counter_range,
|
||||
counter_start
|
||||
);
|
||||
if counter_start > block.counter_range.0 {
|
||||
assert!(store.get(&id_bytes).is_some());
|
||||
|
|
|
@ -12,6 +12,7 @@ use std::collections::{BTreeMap, BTreeSet};
|
|||
use std::fmt::Display;
|
||||
use std::ops::Deref;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tracing::trace;
|
||||
|
||||
use super::change_store::BatchDecodeInfo;
|
||||
use super::ChangeStore;
|
||||
|
@ -30,6 +31,10 @@ pub struct AppDag {
|
|||
frontiers: Frontiers,
|
||||
/// The latest known version vectorG
|
||||
vv: VersionVector,
|
||||
/// The latest known frontiers
|
||||
start_frontiers: Frontiers,
|
||||
/// The latest known version vectorG
|
||||
start_vv: ImVersionVector,
|
||||
/// Ops included in the version vector but not parsed yet
|
||||
///
|
||||
/// # Invariants
|
||||
|
@ -94,6 +99,8 @@ impl AppDag {
|
|||
vv: VersionVector::default(),
|
||||
unparsed_vv: Mutex::new(VersionVector::default()),
|
||||
unhandled_dep_points: Mutex::new(BTreeSet::new()),
|
||||
start_frontiers: Default::default(),
|
||||
start_vv: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -105,6 +112,14 @@ impl AppDag {
|
|||
&self.vv
|
||||
}
|
||||
|
||||
pub fn start_vv(&self) -> &ImVersionVector {
|
||||
&self.start_vv
|
||||
}
|
||||
|
||||
pub fn start_frontiers(&self) -> &Frontiers {
|
||||
&self.start_frontiers
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.vv.is_empty()
|
||||
}
|
||||
|
@ -376,6 +391,7 @@ impl AppDag {
|
|||
return;
|
||||
}
|
||||
|
||||
trace!("Trying to get id={}", id);
|
||||
let Some(nodes) = self.change_store.get_dag_nodes_that_contains(id) else {
|
||||
panic!("unparsed vv don't match with change store. Id:{id} is not in change store")
|
||||
};
|
||||
|
@ -391,6 +407,8 @@ impl AppDag {
|
|||
vv: self.vv.clone(),
|
||||
unparsed_vv: Mutex::new(self.unparsed_vv.try_lock().unwrap().clone()),
|
||||
unhandled_dep_points: Mutex::new(self.unhandled_dep_points.try_lock().unwrap().clone()),
|
||||
start_frontiers: self.start_frontiers.clone(),
|
||||
start_vv: self.start_vv.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -403,6 +421,10 @@ impl AppDag {
|
|||
*self.unparsed_vv.try_lock().unwrap() = v.vv.clone();
|
||||
self.vv = v.vv;
|
||||
self.frontiers = v.frontiers;
|
||||
if let Some((vv, f)) = v.start_version {
|
||||
self.start_frontiers = f;
|
||||
self.start_vv = ImVersionVector::from_vv(&vv);
|
||||
}
|
||||
}
|
||||
|
||||
/// This method is slow and should only be used for debugging and testing.
|
||||
|
@ -658,16 +680,22 @@ impl AppDag {
|
|||
}
|
||||
|
||||
let mut ans_vv = ImVersionVector::default();
|
||||
for id in node.deps.iter() {
|
||||
let node = self.get(*id).expect("deps should be in the dag");
|
||||
let dep_vv = self.ensure_vv_for(&node);
|
||||
if ans_vv.is_empty() {
|
||||
ans_vv = dep_vv;
|
||||
} else {
|
||||
ans_vv.extend_to_include_vv(dep_vv.iter());
|
||||
if node.deps == self.start_frontiers {
|
||||
for (&p, &c) in self.start_vv.iter() {
|
||||
ans_vv.insert(p, c);
|
||||
}
|
||||
} else {
|
||||
for id in node.deps.iter() {
|
||||
let node = self.get(*id).expect("deps should be in the dag");
|
||||
let dep_vv = self.ensure_vv_for(&node);
|
||||
if ans_vv.is_empty() {
|
||||
ans_vv = dep_vv;
|
||||
} else {
|
||||
ans_vv.extend_to_include_vv(dep_vv.iter());
|
||||
}
|
||||
|
||||
ans_vv.insert(node.peer, node.ctr_end());
|
||||
ans_vv.insert(node.peer, node.ctr_end());
|
||||
}
|
||||
}
|
||||
|
||||
node.vv.set(ans_vv.clone()).unwrap();
|
||||
|
|
|
@ -6,6 +6,9 @@ use crate::{
|
|||
configure::Configure,
|
||||
container::idx::ContainerIdx,
|
||||
state::{FastStateSnapshot, RichtextState},
|
||||
utils::kv_wrapper::KvWrapper,
|
||||
version::Frontiers,
|
||||
VersionVector,
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use fxhash::FxHashMap;
|
||||
|
@ -54,6 +57,7 @@ mod inner_store;
|
|||
pub(crate) struct ContainerStore {
|
||||
arena: SharedArena,
|
||||
store: InnerStore,
|
||||
gc_store: Option<Box<GcStore>>,
|
||||
conf: Configure,
|
||||
peer: Arc<AtomicU64>,
|
||||
}
|
||||
|
@ -66,6 +70,11 @@ impl std::fmt::Debug for ContainerStore {
|
|||
}
|
||||
}
|
||||
|
||||
struct GcStore {
|
||||
start_frontiers: Frontiers,
|
||||
store: InnerStore,
|
||||
}
|
||||
|
||||
macro_rules! ctx {
|
||||
($self:expr) => {
|
||||
ContainerCreationContext {
|
||||
|
@ -81,6 +90,7 @@ impl ContainerStore {
|
|||
store: InnerStore::new(arena.clone()),
|
||||
arena,
|
||||
conf,
|
||||
gc_store: None,
|
||||
peer,
|
||||
}
|
||||
}
|
||||
|
@ -108,10 +118,28 @@ impl ContainerStore {
|
|||
self.store.encode()
|
||||
}
|
||||
|
||||
pub fn decode(&mut self, bytes: Bytes) -> LoroResult<()> {
|
||||
pub(crate) fn decode(&mut self, bytes: Bytes) -> LoroResult<()> {
|
||||
self.store.decode(bytes)
|
||||
}
|
||||
|
||||
pub(crate) fn decode_gc(
|
||||
&mut self,
|
||||
gc_bytes: Bytes,
|
||||
state_bytes: Bytes,
|
||||
start_frontiers: Frontiers,
|
||||
) -> LoroResult<()> {
|
||||
assert!(self.gc_store.is_none());
|
||||
self.store.decode_twice(gc_bytes.clone(), state_bytes)?;
|
||||
if !start_frontiers.is_empty() {
|
||||
self.gc_store = Some(Box::new(GcStore {
|
||||
start_frontiers,
|
||||
store: InnerStore::new(self.arena.clone()),
|
||||
}));
|
||||
self.gc_store.as_mut().unwrap().store.decode(gc_bytes);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn iter_and_decode_all(&mut self) -> impl Iterator<Item = &mut State> {
|
||||
self.store.iter_all_containers_mut().map(|(idx, v)| {
|
||||
v.get_state_mut(
|
||||
|
@ -124,6 +152,10 @@ impl ContainerStore {
|
|||
})
|
||||
}
|
||||
|
||||
pub fn get_kv(&self) -> &KvWrapper {
|
||||
self.store.get_kv()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.store.is_empty()
|
||||
}
|
||||
|
@ -179,6 +211,7 @@ impl ContainerStore {
|
|||
arena,
|
||||
conf: config,
|
||||
peer,
|
||||
gc_store: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -87,6 +87,10 @@ impl InnerStore {
|
|||
self.kv.export()
|
||||
}
|
||||
|
||||
pub(crate) fn get_kv(&self) -> &KvWrapper {
|
||||
&self.kv
|
||||
}
|
||||
|
||||
pub(crate) fn decode(&mut self, bytes: bytes::Bytes) -> Result<(), loro_common::LoroError> {
|
||||
assert!(self.len == 0);
|
||||
self.kv.import(bytes);
|
||||
|
@ -109,6 +113,33 @@ impl InnerStore {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn decode_twice(
|
||||
&mut self,
|
||||
bytes_a: bytes::Bytes,
|
||||
bytes_b: bytes::Bytes,
|
||||
) -> Result<(), loro_common::LoroError> {
|
||||
assert!(self.len == 0);
|
||||
self.kv.import(bytes_a);
|
||||
self.kv.import(bytes_b);
|
||||
self.kv.with_kv(|kv| {
|
||||
let mut count = 0;
|
||||
let iter = kv.scan(Bound::Unbounded, Bound::Unbounded);
|
||||
for (k, v) in iter {
|
||||
count += 1;
|
||||
let cid = ContainerID::from_bytes(&k);
|
||||
let parent = ContainerWrapper::decode_parent(&v);
|
||||
let idx = self.arena.register_container(&cid);
|
||||
let p = parent.as_ref().map(|p| self.arena.register_container(p));
|
||||
self.arena.set_parent(idx, p);
|
||||
}
|
||||
|
||||
self.len = count;
|
||||
});
|
||||
|
||||
self.all_loaded = false;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn load_all(&mut self) {
|
||||
if self.all_loaded {
|
||||
return;
|
||||
|
|
|
@ -39,7 +39,6 @@ impl KvWrapper {
|
|||
|
||||
pub fn import(&self, bytes: Bytes) {
|
||||
let mut kv = self.kv.lock().unwrap();
|
||||
assert!(kv.len() == 0);
|
||||
kv.import_all(bytes);
|
||||
}
|
||||
|
||||
|
@ -68,4 +67,14 @@ impl KvWrapper {
|
|||
pub(crate) fn contains_key(&self, key: &[u8]) -> bool {
|
||||
self.kv.lock().unwrap().contains_key(key)
|
||||
}
|
||||
|
||||
pub(crate) fn remove_same(&self, old_kv: &KvWrapper) {
|
||||
let other = old_kv.kv.lock().unwrap();
|
||||
let mut this = self.kv.lock().unwrap();
|
||||
for (k, v) in other.scan(Bound::Unbounded, Bound::Unbounded) {
|
||||
if this.get(&k) == Some(v) {
|
||||
this.remove(&k)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ serde_json = "1.0.87"
|
|||
anyhow = "1.0.83"
|
||||
ctor = "0.2"
|
||||
dev-utils = { path = "../dev-utils" }
|
||||
rand = "0.8.5"
|
||||
|
||||
[features]
|
||||
counter = ["loro-internal/counter"]
|
||||
|
|
|
@ -14,6 +14,7 @@ use loro_internal::handler::HandlerTrait;
|
|||
use loro_internal::handler::ValueOrHandler;
|
||||
use loro_internal::json::JsonChange;
|
||||
use loro_internal::undo::{OnPop, OnPush};
|
||||
use loro_internal::version::ImVersionVector;
|
||||
use loro_internal::DocState;
|
||||
use loro_internal::LoroDoc as InnerLoroDoc;
|
||||
use loro_internal::OpLog;
|
||||
|
@ -436,6 +437,14 @@ impl LoroDoc {
|
|||
self.doc.state_vv()
|
||||
}
|
||||
|
||||
/// Get the `VersionVector` of trimmed history
|
||||
///
|
||||
/// The ops included by the trimmed history are not in the doc.
|
||||
#[inline]
|
||||
pub fn trimmed_vv(&self) -> ImVersionVector {
|
||||
self.doc.trimmed_vv()
|
||||
}
|
||||
|
||||
/// Get the total number of operations in the `OpLog`
|
||||
#[inline]
|
||||
pub fn len_ops(&self) -> usize {
|
||||
|
|
|
@ -4,12 +4,13 @@ use std::{
|
|||
};
|
||||
|
||||
use loro::{
|
||||
awareness::Awareness, loro_value, FrontiersNotIncluded, LoroDoc, LoroError, LoroList, LoroMap,
|
||||
LoroText, ToJson, VersionVector,
|
||||
awareness::Awareness, loro_value, Frontiers, FrontiersNotIncluded, LoroDoc, LoroError,
|
||||
LoroList, LoroMap, LoroText, ToJson, VersionVector,
|
||||
};
|
||||
use loro_internal::{handler::TextDelta, id::ID, vv, LoroResult};
|
||||
use rand::{Rng, SeedableRng};
|
||||
use serde_json::json;
|
||||
use tracing::trace_span;
|
||||
use tracing::{trace, trace_span};
|
||||
|
||||
mod integration_test;
|
||||
|
||||
|
@ -956,3 +957,90 @@ fn new_update_encode_mode() {
|
|||
// Check equality after syncing back
|
||||
assert_eq!(doc.get_deep_value(), doc2.get_deep_value());
|
||||
}
|
||||
|
||||
fn apply_random_ops(doc: &LoroDoc, seed: u64, mut op_len: usize) {
|
||||
let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
|
||||
while op_len > 0 {
|
||||
match rng.gen_range(0..4) {
|
||||
0 => {
|
||||
// Insert text
|
||||
let text = doc.get_text("text");
|
||||
let pos = rng.gen_range(0..=text.len_unicode());
|
||||
let content = rng.gen_range('A'..='z').to_string();
|
||||
text.insert(pos, &content).unwrap();
|
||||
op_len -= 1;
|
||||
}
|
||||
1 => {
|
||||
// Delete text
|
||||
let text = doc.get_text("text");
|
||||
if text.len_unicode() > 0 {
|
||||
let start = rng.gen_range(0..text.len_unicode());
|
||||
text.delete(start, 1).unwrap();
|
||||
op_len -= 1;
|
||||
}
|
||||
}
|
||||
2 => {
|
||||
// Insert into map
|
||||
let map = doc.get_map("map");
|
||||
let key = format!("key{}", rng.gen::<u32>());
|
||||
let value = rng.gen::<i32>();
|
||||
map.insert(&key, value).unwrap();
|
||||
op_len -= 1;
|
||||
}
|
||||
3 => {
|
||||
// Push to list
|
||||
let list = doc.get_list("list");
|
||||
let item = format!("item{}", rng.gen::<u32>());
|
||||
list.push(item).unwrap();
|
||||
op_len -= 1;
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
doc.commit();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_gc_sync() {
|
||||
let doc = LoroDoc::new();
|
||||
apply_random_ops(&doc, 123, 11);
|
||||
let bytes = doc.export(loro::ExportMode::GcSnapshot(
|
||||
&ID::new(doc.peer_id(), 10).into(),
|
||||
));
|
||||
|
||||
let new_doc = LoroDoc::new();
|
||||
new_doc.import(&bytes).unwrap();
|
||||
assert_eq!(doc.get_deep_value(), new_doc.get_deep_value());
|
||||
let trim_end = new_doc.trimmed_vv().get(&doc.peer_id()).copied().unwrap();
|
||||
assert_eq!(trim_end, 10);
|
||||
|
||||
apply_random_ops(&new_doc, 1234, 5);
|
||||
let updates = new_doc.export(loro::ExportMode::Updates(&doc.oplog_vv()));
|
||||
doc.import(&updates).unwrap();
|
||||
assert_eq!(doc.get_deep_value(), new_doc.get_deep_value());
|
||||
|
||||
apply_random_ops(&doc, 11, 5);
|
||||
let updates = doc.export(loro::ExportMode::Updates(&new_doc.oplog_vv()));
|
||||
new_doc.import(&updates).unwrap();
|
||||
assert_eq!(doc.get_deep_value(), new_doc.get_deep_value());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_gc_empty() {
|
||||
let doc = LoroDoc::new();
|
||||
apply_random_ops(&doc, 123, 11);
|
||||
let bytes = doc.export(loro::ExportMode::GcSnapshot(&Frontiers::default()));
|
||||
let new_doc = LoroDoc::new();
|
||||
new_doc.import(&bytes).unwrap();
|
||||
assert_eq!(doc.get_deep_value(), new_doc.get_deep_value());
|
||||
apply_random_ops(&new_doc, 0, 10);
|
||||
doc.import(&new_doc.export_from(&Default::default()))
|
||||
.unwrap();
|
||||
assert_eq!(doc.get_deep_value(), new_doc.get_deep_value());
|
||||
|
||||
let bytes = new_doc.export(loro::ExportMode::Snapshot);
|
||||
let doc_c = LoroDoc::new();
|
||||
doc_c.import(&bytes).unwrap();
|
||||
assert_eq!(doc_c.get_deep_value(), new_doc.get_deep_value());
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue