mirror of
https://github.com/loro-dev/loro.git
synced 2025-02-06 12:25:03 +00:00
perf: opt encode/decode speed
This commit is contained in:
parent
ac9a7e0631
commit
9899a94f43
5 changed files with 238 additions and 234 deletions
|
@ -110,10 +110,23 @@ impl CompactBytes {
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut index = 0;
|
let mut index = 0;
|
||||||
let min_match_size = min_match_size.min(bytes.len());
|
|
||||||
while index < bytes.len() {
|
while index < bytes.len() {
|
||||||
|
if bytes.len() - index < min_match_size {
|
||||||
|
let old_len = self.bytes.len();
|
||||||
|
push_with_merge(
|
||||||
|
&mut ans,
|
||||||
|
self.bytes.len()..self.bytes.len() + bytes.len() - index,
|
||||||
|
);
|
||||||
|
self.bytes.push_slice(&bytes[index..]);
|
||||||
|
self.record_new_prefix(old_len);
|
||||||
|
break;
|
||||||
|
}
|
||||||
match self.lookup(&bytes[index..]) {
|
match self.lookup(&bytes[index..]) {
|
||||||
Some((pos, len)) if len >= min_match_size => {
|
Some((pos, len))
|
||||||
|
if len >= min_match_size
|
||||||
|
&& (len == bytes.len() - index
|
||||||
|
|| bytes.len() - index - len >= min_match_size) =>
|
||||||
|
{
|
||||||
push_with_merge(&mut ans, pos..pos + len);
|
push_with_merge(&mut ans, pos..pos + len);
|
||||||
index += len;
|
index += len;
|
||||||
}
|
}
|
||||||
|
|
|
@ -110,6 +110,7 @@ impl SharedArena {
|
||||||
|
|
||||||
pub fn alloc_values(&self, values: impl Iterator<Item = LoroValue>) -> std::ops::Range<usize> {
|
pub fn alloc_values(&self, values: impl Iterator<Item = LoroValue>) -> std::ops::Range<usize> {
|
||||||
let mut values_lock = self.values.lock().unwrap();
|
let mut values_lock = self.values.lock().unwrap();
|
||||||
|
values_lock.reserve(values.size_hint().0);
|
||||||
let start = values_lock.len();
|
let start = values_lock.len();
|
||||||
for value in values {
|
for value in values {
|
||||||
values_lock.push(value);
|
values_lock.push(value);
|
||||||
|
|
|
@ -119,7 +119,8 @@ impl OpLog {
|
||||||
///
|
///
|
||||||
/// # Err
|
/// # Err
|
||||||
///
|
///
|
||||||
/// Return Err(LoroError::UsedOpID) when the change's id is occupied
|
/// - Return Err(LoroError::UsedOpID) when the change's id is occupied
|
||||||
|
/// - Return Err(LoroError::DecodeError) when the change's deps are missing
|
||||||
pub fn import_local_change(&mut self, change: Change) -> Result<(), LoroError> {
|
pub fn import_local_change(&mut self, change: Change) -> Result<(), LoroError> {
|
||||||
self.check_id_valid(change.id)?;
|
self.check_id_valid(change.id)?;
|
||||||
if let Err(id) = self.check_deps(&change.deps) {
|
if let Err(id) = self.check_deps(&change.deps) {
|
||||||
|
|
|
@ -2,7 +2,6 @@
|
||||||
|
|
||||||
use std::{borrow::Cow, collections::VecDeque, mem::take, sync::Arc};
|
use std::{borrow::Cow, collections::VecDeque, mem::take, sync::Arc};
|
||||||
|
|
||||||
use compact_bytes::CompactBytes;
|
|
||||||
use debug_log::debug_dbg;
|
use debug_log::debug_dbg;
|
||||||
use fxhash::{FxHashMap, FxHashSet};
|
use fxhash::{FxHashMap, FxHashSet};
|
||||||
use loro_common::{HasLamport, ID};
|
use loro_common::{HasLamport, ID};
|
||||||
|
@ -35,13 +34,208 @@ use super::{
|
||||||
state::{AppState, AppStateDiff, ListState, MapState, State, TextState},
|
state::{AppState, AppStateDiff, ListState, MapState, State, TextState},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
pub fn encode_app_snapshot(app: &LoroApp) -> Vec<u8> {
|
||||||
|
let pre_encoded_state = preprocess_app_state(&app.app_state().lock().unwrap());
|
||||||
|
let f = encode_oplog(&app.oplog().lock().unwrap(), Some(pre_encoded_state));
|
||||||
|
// f.diagnose_size();
|
||||||
|
miniz_oxide::deflate::compress_to_vec(&f.encode(), 6)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn decode_app_snapshot(app: &LoroApp, bytes: &[u8]) -> Result<(), LoroError> {
|
||||||
|
assert!(app.is_empty());
|
||||||
|
let bytes = miniz_oxide::inflate::decompress_to_vec(bytes).unwrap();
|
||||||
|
let data = FinalPhase::decode(&bytes)?;
|
||||||
|
let mut app_state = app.app_state().lock().unwrap();
|
||||||
|
decode_state(&mut app_state, &data)?;
|
||||||
|
let arena = app_state.arena.clone();
|
||||||
|
let oplog = decode_oplog(&mut app.oplog().lock().unwrap(), &data, Some(arena))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn decode_oplog(
|
||||||
|
oplog: &mut OpLog,
|
||||||
|
data: &FinalPhase,
|
||||||
|
arena: Option<SharedArena>,
|
||||||
|
) -> Result<(), LoroError> {
|
||||||
|
let arena = arena.unwrap_or_else(SharedArena::default);
|
||||||
|
oplog.arena = arena.clone();
|
||||||
|
let state_arena = TempArena::decode_state_arena(&data)?;
|
||||||
|
let mut extra_arena = TempArena::decode_additional_arena(&data)?;
|
||||||
|
arena.alloc_str_fast(&*extra_arena.text);
|
||||||
|
arena.alloc_values(state_arena.values.into_iter());
|
||||||
|
arena.alloc_values(extra_arena.values.into_iter());
|
||||||
|
let mut keys = state_arena.keywords;
|
||||||
|
keys.append(&mut extra_arena.keywords);
|
||||||
|
|
||||||
|
let common = CommonArena::decode(&data)?;
|
||||||
|
let oplog_data = OplogEncoded::decode(data)?;
|
||||||
|
|
||||||
|
let mut changes = Vec::new();
|
||||||
|
let mut dep_iter = oplog_data.deps.iter();
|
||||||
|
let mut op_iter = oplog_data.ops.iter();
|
||||||
|
let mut counters = FxHashMap::default();
|
||||||
|
for change in oplog_data.changes.iter() {
|
||||||
|
let peer_idx = change.peer_idx as usize;
|
||||||
|
let peer_id = common.peer_ids[peer_idx];
|
||||||
|
let timestamp = change.timestamp;
|
||||||
|
let deps_len = change.deps_len;
|
||||||
|
let dep_on_self = change.dep_on_self;
|
||||||
|
let mut ops = RleVec::new();
|
||||||
|
let counter_mut = counters.entry(peer_idx).or_insert(0);
|
||||||
|
let start_counter = *counter_mut;
|
||||||
|
|
||||||
|
// calc ops
|
||||||
|
let mut total_len = 0;
|
||||||
|
for _ in 0..change.op_len {
|
||||||
|
// calc op
|
||||||
|
let id = ID::new(peer_id, *counter_mut);
|
||||||
|
let encoded_op = op_iter.next().unwrap();
|
||||||
|
let container = common.container_ids[encoded_op.container as usize].clone();
|
||||||
|
let container_idx = arena.register_container(&container);
|
||||||
|
let op = match container.container_type() {
|
||||||
|
loro_common::ContainerType::Text | loro_common::ContainerType::List => {
|
||||||
|
let op = encoded_op.get_list();
|
||||||
|
match op {
|
||||||
|
SnapshotOp::ListInsert { start, len, pos } => Op::new(
|
||||||
|
id,
|
||||||
|
InnerContent::List(InnerListOp::new_insert(start..start + len, pos)),
|
||||||
|
container_idx,
|
||||||
|
),
|
||||||
|
SnapshotOp::ListDelete { len, pos } => Op::new(
|
||||||
|
id,
|
||||||
|
InnerContent::List(InnerListOp::new_del(pos, len)),
|
||||||
|
container_idx,
|
||||||
|
),
|
||||||
|
SnapshotOp::ListUnknown { len, pos } => Op::new(
|
||||||
|
id,
|
||||||
|
InnerContent::List(InnerListOp::new_unknown(pos, len)),
|
||||||
|
container_idx,
|
||||||
|
),
|
||||||
|
SnapshotOp::Map { .. } => {
|
||||||
|
unreachable!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
loro_common::ContainerType::Map => {
|
||||||
|
let op = encoded_op.get_map();
|
||||||
|
match op {
|
||||||
|
SnapshotOp::Map {
|
||||||
|
key,
|
||||||
|
value_idx_plus_one,
|
||||||
|
} => Op::new(
|
||||||
|
id,
|
||||||
|
InnerContent::Map(InnerMapSet {
|
||||||
|
key: (&*keys[key]).into(),
|
||||||
|
value: value_idx_plus_one - 1,
|
||||||
|
}),
|
||||||
|
container_idx,
|
||||||
|
),
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
*counter_mut += op.content_len() as Counter;
|
||||||
|
ops.push(op);
|
||||||
|
}
|
||||||
|
|
||||||
|
// calc deps
|
||||||
|
let mut deps: smallvec::SmallVec<[ID; 2]> = smallvec![];
|
||||||
|
if dep_on_self {
|
||||||
|
deps.push(ID::new(peer_id, start_counter - 1));
|
||||||
|
}
|
||||||
|
|
||||||
|
for _ in 0..deps_len {
|
||||||
|
let dep = dep_iter.next().unwrap();
|
||||||
|
let peer = common.peer_ids[dep.peer_idx as usize];
|
||||||
|
deps.push(ID::new(peer, dep.counter));
|
||||||
|
}
|
||||||
|
|
||||||
|
changes.push(Change {
|
||||||
|
deps: Frontiers::from(deps),
|
||||||
|
ops,
|
||||||
|
timestamp,
|
||||||
|
id: ID::new(peer_id, start_counter),
|
||||||
|
lamport: 0, // calculate lamport when importing
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// we assume changes are already sorted by lamport already
|
||||||
|
for mut change in changes {
|
||||||
|
let lamport = oplog.dag.frontiers_to_next_lamport(&change.deps);
|
||||||
|
change.lamport = lamport;
|
||||||
|
oplog.import_local_change(change)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn decode_state(app_state: &mut AppState, data: &FinalPhase) -> Result<(), LoroError> {
|
||||||
|
assert!(app_state.is_empty());
|
||||||
|
assert!(!app_state.is_in_txn());
|
||||||
|
let arena = app_state.arena.clone();
|
||||||
|
let common = CommonArena::decode(&data)?;
|
||||||
|
let state_arena = TempArena::decode_state_arena(&data)?;
|
||||||
|
let encoded_app_state = EncodedAppState::decode(&data)?;
|
||||||
|
app_state.frontiers = Frontiers::from(&encoded_app_state.frontiers);
|
||||||
|
let mut text_index = 0;
|
||||||
|
// this part should be moved to encode.rs in preload
|
||||||
|
for ((id, parent), state) in common
|
||||||
|
.container_ids
|
||||||
|
.iter()
|
||||||
|
.zip(encoded_app_state.parents.iter())
|
||||||
|
.zip(encoded_app_state.states.iter())
|
||||||
|
{
|
||||||
|
let idx = arena.register_container(id);
|
||||||
|
let parent_idx =
|
||||||
|
(*parent).map(|x| ContainerIdx::from_index_and_type(x, state.container_type()));
|
||||||
|
arena.set_parent(idx, parent_idx);
|
||||||
|
match state {
|
||||||
|
loro_preload::EncodedContainerState::Text { len } => {
|
||||||
|
let index = text_index;
|
||||||
|
app_state.set_state(
|
||||||
|
idx,
|
||||||
|
State::TextState(TextState::from_str(
|
||||||
|
std::str::from_utf8(&state_arena.text[index..index + len]).unwrap(),
|
||||||
|
)),
|
||||||
|
);
|
||||||
|
text_index += len;
|
||||||
|
}
|
||||||
|
loro_preload::EncodedContainerState::Map(map_data) => {
|
||||||
|
let mut map = MapState::new();
|
||||||
|
for entry in map_data.iter() {
|
||||||
|
map.insert(
|
||||||
|
InternalString::from(&*state_arena.keywords[entry.key]),
|
||||||
|
MapValue {
|
||||||
|
counter: entry.counter as Counter,
|
||||||
|
value: if entry.value == 0 {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(state_arena.values[entry.value as usize - 1].clone())
|
||||||
|
},
|
||||||
|
lamport: (entry.lamport, common.peer_ids[entry.peer as usize]),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
app_state.set_state(idx, State::MapState(map));
|
||||||
|
}
|
||||||
|
loro_preload::EncodedContainerState::List(list_data) => {
|
||||||
|
let mut list = ListState::new();
|
||||||
|
list.insert_batch(0, list_data.iter().map(|&x| state_arena.values[x].clone()));
|
||||||
|
app_state.set_state(idx, State::ListState(list));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
type Containers = Vec<ContainerID>;
|
type Containers = Vec<ContainerID>;
|
||||||
type ClientIdx = u32;
|
type ClientIdx = u32;
|
||||||
type Clients = Vec<PeerID>;
|
type Clients = Vec<PeerID>;
|
||||||
|
|
||||||
#[columnar(ser, de)]
|
#[columnar(ser, de)]
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub(super) struct OplogEncoded {
|
struct OplogEncoded {
|
||||||
#[columnar(type = "vec")]
|
#[columnar(type = "vec")]
|
||||||
pub(crate) changes: Vec<EncodedChange>,
|
pub(crate) changes: Vec<EncodedChange>,
|
||||||
#[columnar(type = "vec")]
|
#[columnar(type = "vec")]
|
||||||
|
@ -63,7 +257,7 @@ impl OplogEncoded {
|
||||||
|
|
||||||
#[columnar(vec, ser, de)]
|
#[columnar(vec, ser, de)]
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct EncodedChange {
|
struct EncodedChange {
|
||||||
#[columnar(strategy = "Rle", original_type = "u32")]
|
#[columnar(strategy = "Rle", original_type = "u32")]
|
||||||
pub(super) peer_idx: ClientIdx,
|
pub(super) peer_idx: ClientIdx,
|
||||||
#[columnar(strategy = "DeltaRle", original_type = "i64")]
|
#[columnar(strategy = "DeltaRle", original_type = "i64")]
|
||||||
|
@ -167,11 +361,11 @@ impl EncodedSnapshotOp {
|
||||||
|
|
||||||
#[columnar(vec, ser, de)]
|
#[columnar(vec, ser, de)]
|
||||||
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
|
||||||
pub(super) struct DepsEncoding {
|
struct DepsEncoding {
|
||||||
#[columnar(strategy = "Rle", original_type = "u32")]
|
#[columnar(strategy = "Rle", original_type = "u32")]
|
||||||
pub(super) peer_idx: ClientIdx,
|
peer_idx: ClientIdx,
|
||||||
#[columnar(strategy = "DeltaRle", original_type = "i32")]
|
#[columnar(strategy = "DeltaRle", original_type = "i32")]
|
||||||
pub(super) counter: Counter,
|
counter: Counter,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DepsEncoding {
|
impl DepsEncoding {
|
||||||
|
@ -183,24 +377,6 @@ impl DepsEncoding {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn encode_app_snapshot(app: &LoroApp) -> Vec<u8> {
|
|
||||||
let pre_encoded_state = preprocess_app_state(&app.app_state().lock().unwrap());
|
|
||||||
let f = encode_oplog(&app.oplog().lock().unwrap(), Some(pre_encoded_state));
|
|
||||||
// f.diagnose_size();
|
|
||||||
miniz_oxide::deflate::compress_to_vec(&f.encode(), 6)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn decode_app_snapshot(app: &LoroApp, bytes: &[u8]) -> Result<(), LoroError> {
|
|
||||||
assert!(app.is_empty());
|
|
||||||
let bytes = miniz_oxide::inflate::decompress_to_vec(bytes).unwrap();
|
|
||||||
let data = FinalPhase::decode(&bytes)?;
|
|
||||||
let mut app_state = app.app_state().lock().unwrap();
|
|
||||||
decode_state(&mut app_state, &data)?;
|
|
||||||
let arena = app_state.arena.clone();
|
|
||||||
let oplog = decode_oplog(&mut app.oplog().lock().unwrap(), &data, Some(arena))?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
struct PreEncodedState {
|
struct PreEncodedState {
|
||||||
common: CommonArena<'static>,
|
common: CommonArena<'static>,
|
||||||
|
@ -336,8 +512,7 @@ fn encode_oplog(oplog: &OpLog, state_ref: Option<PreEncodedState>) -> FinalPhase
|
||||||
if common.container_ids.is_empty() {
|
if common.container_ids.is_empty() {
|
||||||
common.container_ids = oplog.arena.export_containers();
|
common.container_ids = oplog.arena.export_containers();
|
||||||
}
|
}
|
||||||
let mut bytes = CompactBytes::new();
|
let mut bytes = Vec::with_capacity(arena.text.len());
|
||||||
bytes.append(&arena.text);
|
|
||||||
let mut extra_keys = Vec::new();
|
let mut extra_keys = Vec::new();
|
||||||
let mut extra_values = Vec::new();
|
let mut extra_values = Vec::new();
|
||||||
|
|
||||||
|
@ -375,19 +550,14 @@ fn encode_oplog(oplog: &OpLog, state_ref: Option<PreEncodedState>) -> FinalPhase
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut record_str = |s: &[u8], mut pos: usize, container_idx: u32| {
|
let mut record_str = |s: &[u8], mut pos: usize, container_idx: u32| {
|
||||||
let slices = [bytes.alloc(s)]; // PERF: We can optimize this further
|
let mut start_idx = bytes.len();
|
||||||
slices
|
let slice = bytes.extend_from_slice(s);
|
||||||
.into_iter()
|
let ans = SnapshotOp::ListInsert {
|
||||||
.map(|range| {
|
pos,
|
||||||
let ans = SnapshotOp::ListInsert {
|
start: start_idx as u32,
|
||||||
pos,
|
len: s.len() as u32,
|
||||||
start: range.start() as u32,
|
};
|
||||||
len: range.len() as u32,
|
EncodedSnapshotOp::from(ans, container_idx)
|
||||||
};
|
|
||||||
pos += range.len();
|
|
||||||
EncodedSnapshotOp::from(ans, container_idx)
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Add all changes
|
// Add all changes
|
||||||
|
@ -427,7 +597,7 @@ fn encode_oplog(oplog: &OpLog, state_ref: Option<PreEncodedState>) -> FinalPhase
|
||||||
let slice = oplog
|
let slice = oplog
|
||||||
.arena
|
.arena
|
||||||
.slice_bytes(slice.0.start as usize..slice.0.end as usize);
|
.slice_bytes(slice.0.start as usize..slice.0.end as usize);
|
||||||
encoded_ops.extend(record_str(
|
encoded_ops.push(record_str(
|
||||||
&slice,
|
&slice,
|
||||||
*pos as usize,
|
*pos as usize,
|
||||||
op.container.to_index(),
|
op.container.to_index(),
|
||||||
|
@ -511,8 +681,6 @@ fn encode_oplog(oplog: &OpLog, state_ref: Option<PreEncodedState>) -> FinalPhase
|
||||||
}
|
}
|
||||||
|
|
||||||
common.peer_ids = Cow::Owned(peers);
|
common.peer_ids = Cow::Owned(peers);
|
||||||
let bytes = bytes.take();
|
|
||||||
let extra_text = &bytes[arena.text.len()..];
|
|
||||||
let oplog_encoded = OplogEncoded {
|
let oplog_encoded = OplogEncoded {
|
||||||
changes: encoded_changes,
|
changes: encoded_changes,
|
||||||
ops: encoded_ops,
|
ops: encoded_ops,
|
||||||
|
@ -527,9 +695,9 @@ fn encode_oplog(oplog: &OpLog, state_ref: Option<PreEncodedState>) -> FinalPhase
|
||||||
common: Cow::Owned(common.encode()),
|
common: Cow::Owned(common.encode()),
|
||||||
app_state: Cow::Owned(app_state.encode()),
|
app_state: Cow::Owned(app_state.encode()),
|
||||||
state_arena: Cow::Owned(arena.encode()),
|
state_arena: Cow::Owned(arena.encode()),
|
||||||
additional_arena: Cow::Owned(
|
oplog_extra_arena: Cow::Owned(
|
||||||
TempArena {
|
TempArena {
|
||||||
text: Cow::Borrowed(extra_text),
|
text: Cow::Borrowed(&bytes),
|
||||||
keywords: extra_keys,
|
keywords: extra_keys,
|
||||||
values: extra_values,
|
values: extra_values,
|
||||||
}
|
}
|
||||||
|
@ -541,185 +709,6 @@ fn encode_oplog(oplog: &OpLog, state_ref: Option<PreEncodedState>) -> FinalPhase
|
||||||
ans
|
ans
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn decode_oplog(
|
|
||||||
oplog: &mut OpLog,
|
|
||||||
data: &FinalPhase,
|
|
||||||
arena: Option<SharedArena>,
|
|
||||||
) -> Result<(), LoroError> {
|
|
||||||
let arena = arena.unwrap_or_else(SharedArena::default);
|
|
||||||
oplog.arena = arena.clone();
|
|
||||||
let state_arena = TempArena::decode_state_arena(&data)?;
|
|
||||||
let mut extra_arena = TempArena::decode_additional_arena(&data)?;
|
|
||||||
arena.alloc_str_fast(&*state_arena.text);
|
|
||||||
arena.alloc_str_fast(&*extra_arena.text);
|
|
||||||
arena.alloc_values(state_arena.values.into_iter());
|
|
||||||
arena.alloc_values(extra_arena.values.into_iter());
|
|
||||||
let mut keys = state_arena.keywords;
|
|
||||||
keys.append(&mut extra_arena.keywords);
|
|
||||||
|
|
||||||
let common = CommonArena::decode(&data)?;
|
|
||||||
let oplog_data = OplogEncoded::decode(data)?;
|
|
||||||
|
|
||||||
let mut changes = Vec::new();
|
|
||||||
let mut dep_iter = oplog_data.deps.iter();
|
|
||||||
let mut op_iter = oplog_data.ops.iter();
|
|
||||||
let mut counters = FxHashMap::default();
|
|
||||||
for change in oplog_data.changes.iter() {
|
|
||||||
let peer_idx = change.peer_idx as usize;
|
|
||||||
let peer_id = common.peer_ids[peer_idx];
|
|
||||||
let timestamp = change.timestamp;
|
|
||||||
let deps_len = change.deps_len;
|
|
||||||
let dep_on_self = change.dep_on_self;
|
|
||||||
let mut ops = RleVec::new();
|
|
||||||
let counter_mut = counters.entry(peer_idx).or_insert(0);
|
|
||||||
let start_counter = *counter_mut;
|
|
||||||
|
|
||||||
// calc ops
|
|
||||||
let mut total_len = 0;
|
|
||||||
for _ in 0..change.op_len {
|
|
||||||
// calc op
|
|
||||||
let id = ID::new(peer_id, *counter_mut);
|
|
||||||
let encoded_op = op_iter.next().unwrap();
|
|
||||||
let container = common.container_ids[encoded_op.container as usize].clone();
|
|
||||||
let container_idx = arena.register_container(&container);
|
|
||||||
let op = match container.container_type() {
|
|
||||||
loro_common::ContainerType::Text | loro_common::ContainerType::List => {
|
|
||||||
let op = encoded_op.get_list();
|
|
||||||
match op {
|
|
||||||
SnapshotOp::ListInsert { start, len, pos } => Op::new(
|
|
||||||
id,
|
|
||||||
InnerContent::List(InnerListOp::new_insert(start..start + len, pos)),
|
|
||||||
container_idx,
|
|
||||||
),
|
|
||||||
SnapshotOp::ListDelete { len, pos } => Op::new(
|
|
||||||
id,
|
|
||||||
InnerContent::List(InnerListOp::new_del(pos, len)),
|
|
||||||
container_idx,
|
|
||||||
),
|
|
||||||
SnapshotOp::ListUnknown { len, pos } => Op::new(
|
|
||||||
id,
|
|
||||||
InnerContent::List(InnerListOp::new_unknown(pos, len)),
|
|
||||||
container_idx,
|
|
||||||
),
|
|
||||||
SnapshotOp::Map { .. } => {
|
|
||||||
unreachable!()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
loro_common::ContainerType::Map => {
|
|
||||||
let op = encoded_op.get_map();
|
|
||||||
match op {
|
|
||||||
SnapshotOp::Map {
|
|
||||||
key,
|
|
||||||
value_idx_plus_one,
|
|
||||||
} => Op::new(
|
|
||||||
id,
|
|
||||||
InnerContent::Map(InnerMapSet {
|
|
||||||
key: (&*keys[key]).into(),
|
|
||||||
value: value_idx_plus_one - 1,
|
|
||||||
}),
|
|
||||||
container_idx,
|
|
||||||
),
|
|
||||||
_ => unreachable!(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
*counter_mut += op.content_len() as Counter;
|
|
||||||
ops.push(op);
|
|
||||||
}
|
|
||||||
|
|
||||||
// calc deps
|
|
||||||
let mut deps: smallvec::SmallVec<[ID; 2]> = smallvec![];
|
|
||||||
if dep_on_self {
|
|
||||||
assert!(start_counter > 0);
|
|
||||||
deps.push(ID::new(peer_id, start_counter - 1));
|
|
||||||
}
|
|
||||||
|
|
||||||
for _ in 0..deps_len {
|
|
||||||
let dep = dep_iter.next().unwrap();
|
|
||||||
let peer = common.peer_ids[dep.peer_idx as usize];
|
|
||||||
deps.push(ID::new(peer, dep.counter));
|
|
||||||
}
|
|
||||||
|
|
||||||
changes.push(Change {
|
|
||||||
deps: Frontiers::from(deps),
|
|
||||||
ops,
|
|
||||||
timestamp,
|
|
||||||
id: ID::new(peer_id, start_counter),
|
|
||||||
lamport: 0, // calculate lamport when importing
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// we assume changes are already sorted by lamport already
|
|
||||||
for mut change in changes {
|
|
||||||
let lamport = oplog.dag.frontiers_to_next_lamport(&change.deps);
|
|
||||||
change.lamport = lamport;
|
|
||||||
oplog.import_local_change(change)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn decode_state(app_state: &mut AppState, data: &FinalPhase) -> Result<(), LoroError> {
|
|
||||||
assert!(app_state.is_empty());
|
|
||||||
assert!(!app_state.is_in_txn());
|
|
||||||
let arena = app_state.arena.clone();
|
|
||||||
let common = CommonArena::decode(&data)?;
|
|
||||||
let state_arena = TempArena::decode_state_arena(&data)?;
|
|
||||||
let encoded_app_state = EncodedAppState::decode(&data)?;
|
|
||||||
app_state.frontiers = Frontiers::from(&encoded_app_state.frontiers);
|
|
||||||
let mut text_index = 0;
|
|
||||||
// this part should be moved to encode.rs in preload
|
|
||||||
for ((id, parent), state) in common
|
|
||||||
.container_ids
|
|
||||||
.iter()
|
|
||||||
.zip(encoded_app_state.parents.iter())
|
|
||||||
.zip(encoded_app_state.states.iter())
|
|
||||||
{
|
|
||||||
let idx = arena.register_container(id);
|
|
||||||
let parent_idx =
|
|
||||||
(*parent).map(|x| ContainerIdx::from_index_and_type(x, state.container_type()));
|
|
||||||
arena.set_parent(idx, parent_idx);
|
|
||||||
match state {
|
|
||||||
loro_preload::EncodedContainerState::Text { len } => {
|
|
||||||
let index = text_index;
|
|
||||||
app_state.set_state(
|
|
||||||
idx,
|
|
||||||
State::TextState(TextState::from_str(
|
|
||||||
std::str::from_utf8(&state_arena.text[index..index + len]).unwrap(),
|
|
||||||
)),
|
|
||||||
);
|
|
||||||
text_index += len;
|
|
||||||
}
|
|
||||||
loro_preload::EncodedContainerState::Map(map_data) => {
|
|
||||||
let mut map = MapState::new();
|
|
||||||
for entry in map_data.iter() {
|
|
||||||
map.insert(
|
|
||||||
InternalString::from(&*state_arena.keywords[entry.key]),
|
|
||||||
MapValue {
|
|
||||||
counter: entry.counter as Counter,
|
|
||||||
value: if entry.value == 0 {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
Some(state_arena.values[entry.value as usize - 1].clone())
|
|
||||||
},
|
|
||||||
lamport: (entry.lamport, common.peer_ids[entry.peer as usize]),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
app_state.set_state(idx, State::MapState(map));
|
|
||||||
}
|
|
||||||
loro_preload::EncodedContainerState::List(list_data) => {
|
|
||||||
let mut list = ListState::new();
|
|
||||||
list.insert_batch(0, list_data.iter().map(|&x| state_arena.values[x].clone()));
|
|
||||||
app_state.set_state(idx, State::ListState(list));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
@ -732,7 +721,7 @@ mod test {
|
||||||
common: Cow::Owned(vec![0, 1, 2, 253, 254, 255]),
|
common: Cow::Owned(vec![0, 1, 2, 253, 254, 255]),
|
||||||
app_state: Cow::Owned(vec![255]),
|
app_state: Cow::Owned(vec![255]),
|
||||||
state_arena: Cow::Owned(vec![255]),
|
state_arena: Cow::Owned(vec![255]),
|
||||||
additional_arena: Cow::Owned(vec![255]),
|
oplog_extra_arena: Cow::Owned(vec![255]),
|
||||||
oplog: Cow::Owned(vec![255]),
|
oplog: Cow::Owned(vec![255]),
|
||||||
}
|
}
|
||||||
.encode());
|
.encode());
|
||||||
|
|
|
@ -14,7 +14,7 @@ pub struct FinalPhase<'a> {
|
||||||
#[serde(borrow)]
|
#[serde(borrow)]
|
||||||
pub state_arena: Cow<'a, [u8]>, // -> TempArena<'a>
|
pub state_arena: Cow<'a, [u8]>, // -> TempArena<'a>
|
||||||
#[serde(borrow)]
|
#[serde(borrow)]
|
||||||
pub additional_arena: Cow<'a, [u8]>, // -> TempArena<'a>,抛弃这部分则不能回溯历史
|
pub oplog_extra_arena: Cow<'a, [u8]>, // -> TempArena<'a>,抛弃这部分则不能回溯历史
|
||||||
#[serde(borrow)]
|
#[serde(borrow)]
|
||||||
pub oplog: Cow<'a, [u8]>, // -> OpLog. Can be ignored if we only need state
|
pub oplog: Cow<'a, [u8]>, // -> OpLog. Can be ignored if we only need state
|
||||||
}
|
}
|
||||||
|
@ -26,7 +26,7 @@ impl<'a> FinalPhase<'a> {
|
||||||
self.common.len()
|
self.common.len()
|
||||||
+ self.app_state.len()
|
+ self.app_state.len()
|
||||||
+ self.state_arena.len()
|
+ self.state_arena.len()
|
||||||
+ self.additional_arena.len()
|
+ self.oplog_extra_arena.len()
|
||||||
+ self.oplog.len()
|
+ self.oplog.len()
|
||||||
+ 10,
|
+ 10,
|
||||||
);
|
);
|
||||||
|
@ -37,8 +37,8 @@ impl<'a> FinalPhase<'a> {
|
||||||
bytes.put_slice(&self.app_state);
|
bytes.put_slice(&self.app_state);
|
||||||
leb::write_unsigned(&mut bytes, self.state_arena.len() as u64);
|
leb::write_unsigned(&mut bytes, self.state_arena.len() as u64);
|
||||||
bytes.put_slice(&self.state_arena);
|
bytes.put_slice(&self.state_arena);
|
||||||
leb::write_unsigned(&mut bytes, self.additional_arena.len() as u64);
|
leb::write_unsigned(&mut bytes, self.oplog_extra_arena.len() as u64);
|
||||||
bytes.put_slice(&self.additional_arena);
|
bytes.put_slice(&self.oplog_extra_arena);
|
||||||
leb::write_unsigned(&mut bytes, self.oplog.len() as u64);
|
leb::write_unsigned(&mut bytes, self.oplog.len() as u64);
|
||||||
bytes.put_slice(&self.oplog);
|
bytes.put_slice(&self.oplog);
|
||||||
bytes.to_vec()
|
bytes.to_vec()
|
||||||
|
@ -70,7 +70,7 @@ impl<'a> FinalPhase<'a> {
|
||||||
common: Cow::Borrowed(common),
|
common: Cow::Borrowed(common),
|
||||||
app_state: Cow::Borrowed(app_state),
|
app_state: Cow::Borrowed(app_state),
|
||||||
state_arena: Cow::Borrowed(state_arena),
|
state_arena: Cow::Borrowed(state_arena),
|
||||||
additional_arena: Cow::Borrowed(additional_arena),
|
oplog_extra_arena: Cow::Borrowed(additional_arena),
|
||||||
oplog: Cow::Borrowed(oplog),
|
oplog: Cow::Borrowed(oplog),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -79,7 +79,7 @@ impl<'a> FinalPhase<'a> {
|
||||||
println!("common: {}", self.common.len());
|
println!("common: {}", self.common.len());
|
||||||
println!("app_state: {}", self.app_state.len());
|
println!("app_state: {}", self.app_state.len());
|
||||||
println!("state_arena: {}", self.state_arena.len());
|
println!("state_arena: {}", self.state_arena.len());
|
||||||
println!("additional_arena: {}", self.additional_arena.len());
|
println!("additional_arena: {}", self.oplog_extra_arena.len());
|
||||||
println!("oplog: {}", self.oplog.len());
|
println!("oplog: {}", self.oplog.len());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -167,7 +167,7 @@ impl<'a> TempArena<'a> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn decode_additional_arena(data: &'a FinalPhase) -> Result<Self, LoroError> {
|
pub fn decode_additional_arena(data: &'a FinalPhase) -> Result<Self, LoroError> {
|
||||||
serde_columnar::from_bytes(&data.additional_arena)
|
serde_columnar::from_bytes(&data.oplog_extra_arena)
|
||||||
.map_err(|e| LoroError::DecodeError(e.to_string().into_boxed_str()))
|
.map_err(|e| LoroError::DecodeError(e.to_string().into_boxed_str()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue