fix: snapshot load diff

This commit is contained in:
leeeon233 2023-02-16 09:09:24 +08:00 committed by Leonzhao
parent 5b6f864479
commit 7ffac80215
2 changed files with 106 additions and 47 deletions

View file

@ -27,7 +27,7 @@ fn main() {
start.elapsed().as_millis()
);
let start = Instant::now();
let buf_snapshot = loro.encode_all();
let buf_snapshot = loro.encode_with_cfg(EncodeConfig::snapshot().without_compress());
let json_snapshot = loro.to_json();
println!(

View file

@ -22,7 +22,7 @@ use crate::{
op::{InnerContent, Op},
span::{HasIdSpan, HasLamportSpan},
version::TotalOrderStamp,
ContainerType, InternalString, LogStore, LoroError, LoroValue, VersionVector,
ContainerType, InternalString, LogStore, LoroCore, LoroError, LoroValue, VersionVector,
};
use super::encode_changes::{ChangeEncoding, DepsEncoding};
@ -84,7 +84,7 @@ impl StateContent {
}
impl EncodedStateContent {
pub fn into_state(self, keys: &[InternalString], clients: &Clients) -> StateContent {
pub fn into_state(self, keys: &[InternalString], clients: &[ClientID]) -> StateContent {
match self {
EncodedStateContent::List { pool, state_len } => StateContent::List { pool, state_len },
EncodedStateContent::Map {
@ -420,52 +420,111 @@ pub(super) fn decode_snapshot(
},
None => false,
};
if can_load {
let mut frontiers = vv.clone();
for (_, changes) in changes.iter() {
for change in changes.iter() {
remove_included_frontiers(&mut frontiers, &change.deps);
}
}
// rebuild states by snapshot
let mut import_context = ImportContext {
old_frontiers: smallvec![],
new_frontiers: frontiers.get_frontiers(),
old_vv: VersionVector::new(),
spans: vv.diff(&store.vv).left,
new_vv: vv.clone(),
diff: Default::default(),
patched_old_vv: None,
};
for (container_id, pool_mapping) in containers.into_iter().zip(container_states.into_iter())
{
let container_idx = store.reg.get_or_create_container_idx(&container_id);
container_idx2type.insert(container_idx, container_id.container_type());
let state = pool_mapping.into_state(&keys, &clients);
let container = store.reg.get_by_idx(container_idx).unwrap();
let mut container = container.try_lock().unwrap();
container.to_import_snapshot(state, hierarchy, &mut import_context);
}
store.latest_lamport = changes
.values()
.map(|changes| changes.last().unwrap().lamport_last())
.max()
.unwrap();
store.latest_timestamp = changes
.values()
.map(|changes| changes.last().unwrap().timestamp)
.max()
.unwrap();
store.changes = changes;
store.vv = vv;
store.frontiers = frontiers.get_frontiers();
// FIXME: events are wrong if store was not empty
let mut import_context = load_snapshot(
store,
hierarchy,
vv,
changes,
containers,
container_states,
container_idx2type,
&keys,
&clients,
);
Ok(store.get_events(hierarchy, &mut import_context))
} else {
todo!("load the diffing");
let new_loro = LoroCore::default();
let mut new_store = new_loro.log_store.try_write().unwrap();
let mut new_hierarchy = new_loro.hierarchy.try_lock().unwrap();
load_snapshot(
&mut new_store,
&mut new_hierarchy,
vv,
changes,
containers,
container_states,
container_idx2type,
&keys,
&clients,
);
let diff_changes = new_store.export(&store.vv);
Ok(store.import(hierarchy, diff_changes))
}
}
#[allow(clippy::too_many_arguments)]
fn load_snapshot(
new_store: &mut LogStore,
new_hierarchy: &mut Hierarchy,
vv: VersionVector,
changes: FxHashMap<ClientID, RleVecWithIndex<Change, ChangeMergeCfg>>,
containers: Vec<ContainerID>,
container_states: Vec<EncodedStateContent>,
mut container_idx2type: FxHashMap<ContainerIdx, ContainerType>,
keys: &[InternalString],
clients: &[u64],
) -> ImportContext {
let mut frontiers = vv.clone();
for (_, changes) in changes.iter() {
for change in changes.iter() {
remove_included_frontiers(&mut frontiers, &change.deps);
}
}
// rebuild states by snapshot
let mut import_context = ImportContext {
old_frontiers: smallvec![],
new_frontiers: frontiers.get_frontiers(),
old_vv: VersionVector::new(),
spans: vv.diff(&new_store.vv).left,
new_vv: vv.clone(),
diff: Default::default(),
patched_old_vv: None,
};
for (container_id, pool_mapping) in containers.into_iter().zip(container_states.into_iter()) {
let container_idx = new_store.reg.get_or_create_container_idx(&container_id);
container_idx2type.insert(container_idx, container_id.container_type());
let state = pool_mapping.into_state(keys, clients);
let container = new_store.reg.get_by_idx(container_idx).unwrap();
let mut container = container.try_lock().unwrap();
container.to_import_snapshot(state, new_hierarchy, &mut import_context);
}
new_store.latest_lamport = changes
.values()
.map(|changes| changes.last().unwrap().lamport_last())
.max()
.unwrap();
new_store.latest_timestamp = changes
.values()
.map(|changes| changes.last().unwrap().timestamp)
.max()
.unwrap();
new_store.changes = changes;
new_store.vv = vv;
new_store.frontiers = frontiers.get_frontiers();
import_context
}
#[cfg(test)]
mod test {
use crate::LoroCore;
#[test]
fn cannot_load() {
let mut loro = LoroCore::new(Default::default(), Some(1));
let mut text = loro.get_text("text");
text.insert(&loro, 0, "abc").unwrap();
let snapshot = loro.encode_all();
let mut loro2 = LoroCore::new(Default::default(), Some(2));
let mut text2 = loro2.get_text("text");
text2.insert(&loro2, 0, "efg").unwrap();
loro2.decode(&snapshot).unwrap();
assert_eq!(text2.get_value().to_json_pretty(), "\"abcefg\"");
}
}