From 9899a94f43a1aaa7a22c0706b6c632fbeb6e352a Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Tue, 18 Jul 2023 18:24:51 +0800 Subject: [PATCH] perf: opt encode/decode speed --- crates/compact-bytes/src/lib.rs | 17 +- crates/loro-internal/src/refactor/arena.rs | 1 + crates/loro-internal/src/refactor/oplog.rs | 3 +- .../src/refactor/snapshot_encode.rs | 437 +++++++++--------- crates/loro-preload/src/encode.rs | 14 +- 5 files changed, 238 insertions(+), 234 deletions(-) diff --git a/crates/compact-bytes/src/lib.rs b/crates/compact-bytes/src/lib.rs index a9e2d0c5..d7aede36 100644 --- a/crates/compact-bytes/src/lib.rs +++ b/crates/compact-bytes/src/lib.rs @@ -110,10 +110,23 @@ impl CompactBytes { } let mut index = 0; - let min_match_size = min_match_size.min(bytes.len()); while index < bytes.len() { + if bytes.len() - index < min_match_size { + let old_len = self.bytes.len(); + push_with_merge( + &mut ans, + self.bytes.len()..self.bytes.len() + bytes.len() - index, + ); + self.bytes.push_slice(&bytes[index..]); + self.record_new_prefix(old_len); + break; + } match self.lookup(&bytes[index..]) { - Some((pos, len)) if len >= min_match_size => { + Some((pos, len)) + if len >= min_match_size + && (len == bytes.len() - index + || bytes.len() - index - len >= min_match_size) => + { push_with_merge(&mut ans, pos..pos + len); index += len; } diff --git a/crates/loro-internal/src/refactor/arena.rs b/crates/loro-internal/src/refactor/arena.rs index 70164bc2..04df03b3 100644 --- a/crates/loro-internal/src/refactor/arena.rs +++ b/crates/loro-internal/src/refactor/arena.rs @@ -110,6 +110,7 @@ impl SharedArena { pub fn alloc_values(&self, values: impl Iterator) -> std::ops::Range { let mut values_lock = self.values.lock().unwrap(); + values_lock.reserve(values.size_hint().0); let start = values_lock.len(); for value in values { values_lock.push(value); diff --git a/crates/loro-internal/src/refactor/oplog.rs b/crates/loro-internal/src/refactor/oplog.rs index 919e967d..d5d807de 100644 --- a/crates/loro-internal/src/refactor/oplog.rs +++ b/crates/loro-internal/src/refactor/oplog.rs @@ -119,7 +119,8 @@ impl OpLog { /// /// # Err /// - /// Return Err(LoroError::UsedOpID) when the change's id is occupied + /// - Return Err(LoroError::UsedOpID) when the change's id is occupied + /// - Return Err(LoroError::DecodeError) when the change's deps are missing pub fn import_local_change(&mut self, change: Change) -> Result<(), LoroError> { self.check_id_valid(change.id)?; if let Err(id) = self.check_deps(&change.deps) { diff --git a/crates/loro-internal/src/refactor/snapshot_encode.rs b/crates/loro-internal/src/refactor/snapshot_encode.rs index 25a5b5c5..e8fd7fea 100644 --- a/crates/loro-internal/src/refactor/snapshot_encode.rs +++ b/crates/loro-internal/src/refactor/snapshot_encode.rs @@ -2,7 +2,6 @@ use std::{borrow::Cow, collections::VecDeque, mem::take, sync::Arc}; -use compact_bytes::CompactBytes; use debug_log::debug_dbg; use fxhash::{FxHashMap, FxHashSet}; use loro_common::{HasLamport, ID}; @@ -35,13 +34,208 @@ use super::{ state::{AppState, AppStateDiff, ListState, MapState, State, TextState}, }; +pub fn encode_app_snapshot(app: &LoroApp) -> Vec { + let pre_encoded_state = preprocess_app_state(&app.app_state().lock().unwrap()); + let f = encode_oplog(&app.oplog().lock().unwrap(), Some(pre_encoded_state)); + // f.diagnose_size(); + miniz_oxide::deflate::compress_to_vec(&f.encode(), 6) +} + +pub fn decode_app_snapshot(app: &LoroApp, bytes: &[u8]) -> Result<(), LoroError> { + assert!(app.is_empty()); + let bytes = miniz_oxide::inflate::decompress_to_vec(bytes).unwrap(); + let data = FinalPhase::decode(&bytes)?; + let mut app_state = app.app_state().lock().unwrap(); + decode_state(&mut app_state, &data)?; + let arena = app_state.arena.clone(); + let oplog = decode_oplog(&mut app.oplog().lock().unwrap(), &data, Some(arena))?; + Ok(()) +} + +pub fn decode_oplog( + oplog: &mut OpLog, + data: &FinalPhase, + arena: Option, +) -> Result<(), LoroError> { + let arena = arena.unwrap_or_else(SharedArena::default); + oplog.arena = arena.clone(); + let state_arena = TempArena::decode_state_arena(&data)?; + let mut extra_arena = TempArena::decode_additional_arena(&data)?; + arena.alloc_str_fast(&*extra_arena.text); + arena.alloc_values(state_arena.values.into_iter()); + arena.alloc_values(extra_arena.values.into_iter()); + let mut keys = state_arena.keywords; + keys.append(&mut extra_arena.keywords); + + let common = CommonArena::decode(&data)?; + let oplog_data = OplogEncoded::decode(data)?; + + let mut changes = Vec::new(); + let mut dep_iter = oplog_data.deps.iter(); + let mut op_iter = oplog_data.ops.iter(); + let mut counters = FxHashMap::default(); + for change in oplog_data.changes.iter() { + let peer_idx = change.peer_idx as usize; + let peer_id = common.peer_ids[peer_idx]; + let timestamp = change.timestamp; + let deps_len = change.deps_len; + let dep_on_self = change.dep_on_self; + let mut ops = RleVec::new(); + let counter_mut = counters.entry(peer_idx).or_insert(0); + let start_counter = *counter_mut; + + // calc ops + let mut total_len = 0; + for _ in 0..change.op_len { + // calc op + let id = ID::new(peer_id, *counter_mut); + let encoded_op = op_iter.next().unwrap(); + let container = common.container_ids[encoded_op.container as usize].clone(); + let container_idx = arena.register_container(&container); + let op = match container.container_type() { + loro_common::ContainerType::Text | loro_common::ContainerType::List => { + let op = encoded_op.get_list(); + match op { + SnapshotOp::ListInsert { start, len, pos } => Op::new( + id, + InnerContent::List(InnerListOp::new_insert(start..start + len, pos)), + container_idx, + ), + SnapshotOp::ListDelete { len, pos } => Op::new( + id, + InnerContent::List(InnerListOp::new_del(pos, len)), + container_idx, + ), + SnapshotOp::ListUnknown { len, pos } => Op::new( + id, + InnerContent::List(InnerListOp::new_unknown(pos, len)), + container_idx, + ), + SnapshotOp::Map { .. } => { + unreachable!() + } + } + } + loro_common::ContainerType::Map => { + let op = encoded_op.get_map(); + match op { + SnapshotOp::Map { + key, + value_idx_plus_one, + } => Op::new( + id, + InnerContent::Map(InnerMapSet { + key: (&*keys[key]).into(), + value: value_idx_plus_one - 1, + }), + container_idx, + ), + _ => unreachable!(), + } + } + }; + *counter_mut += op.content_len() as Counter; + ops.push(op); + } + + // calc deps + let mut deps: smallvec::SmallVec<[ID; 2]> = smallvec![]; + if dep_on_self { + deps.push(ID::new(peer_id, start_counter - 1)); + } + + for _ in 0..deps_len { + let dep = dep_iter.next().unwrap(); + let peer = common.peer_ids[dep.peer_idx as usize]; + deps.push(ID::new(peer, dep.counter)); + } + + changes.push(Change { + deps: Frontiers::from(deps), + ops, + timestamp, + id: ID::new(peer_id, start_counter), + lamport: 0, // calculate lamport when importing + }); + } + + // we assume changes are already sorted by lamport already + for mut change in changes { + let lamport = oplog.dag.frontiers_to_next_lamport(&change.deps); + change.lamport = lamport; + oplog.import_local_change(change)?; + } + + Ok(()) +} + +pub fn decode_state(app_state: &mut AppState, data: &FinalPhase) -> Result<(), LoroError> { + assert!(app_state.is_empty()); + assert!(!app_state.is_in_txn()); + let arena = app_state.arena.clone(); + let common = CommonArena::decode(&data)?; + let state_arena = TempArena::decode_state_arena(&data)?; + let encoded_app_state = EncodedAppState::decode(&data)?; + app_state.frontiers = Frontiers::from(&encoded_app_state.frontiers); + let mut text_index = 0; + // this part should be moved to encode.rs in preload + for ((id, parent), state) in common + .container_ids + .iter() + .zip(encoded_app_state.parents.iter()) + .zip(encoded_app_state.states.iter()) + { + let idx = arena.register_container(id); + let parent_idx = + (*parent).map(|x| ContainerIdx::from_index_and_type(x, state.container_type())); + arena.set_parent(idx, parent_idx); + match state { + loro_preload::EncodedContainerState::Text { len } => { + let index = text_index; + app_state.set_state( + idx, + State::TextState(TextState::from_str( + std::str::from_utf8(&state_arena.text[index..index + len]).unwrap(), + )), + ); + text_index += len; + } + loro_preload::EncodedContainerState::Map(map_data) => { + let mut map = MapState::new(); + for entry in map_data.iter() { + map.insert( + InternalString::from(&*state_arena.keywords[entry.key]), + MapValue { + counter: entry.counter as Counter, + value: if entry.value == 0 { + None + } else { + Some(state_arena.values[entry.value as usize - 1].clone()) + }, + lamport: (entry.lamport, common.peer_ids[entry.peer as usize]), + }, + ) + } + app_state.set_state(idx, State::MapState(map)); + } + loro_preload::EncodedContainerState::List(list_data) => { + let mut list = ListState::new(); + list.insert_batch(0, list_data.iter().map(|&x| state_arena.values[x].clone())); + app_state.set_state(idx, State::ListState(list)); + } + } + } + + Ok(()) +} + type Containers = Vec; type ClientIdx = u32; type Clients = Vec; #[columnar(ser, de)] #[derive(Debug, Serialize, Deserialize)] -pub(super) struct OplogEncoded { +struct OplogEncoded { #[columnar(type = "vec")] pub(crate) changes: Vec, #[columnar(type = "vec")] @@ -63,7 +257,7 @@ impl OplogEncoded { #[columnar(vec, ser, de)] #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct EncodedChange { +struct EncodedChange { #[columnar(strategy = "Rle", original_type = "u32")] pub(super) peer_idx: ClientIdx, #[columnar(strategy = "DeltaRle", original_type = "i64")] @@ -167,11 +361,11 @@ impl EncodedSnapshotOp { #[columnar(vec, ser, de)] #[derive(Debug, Copy, Clone, Serialize, Deserialize)] -pub(super) struct DepsEncoding { +struct DepsEncoding { #[columnar(strategy = "Rle", original_type = "u32")] - pub(super) peer_idx: ClientIdx, + peer_idx: ClientIdx, #[columnar(strategy = "DeltaRle", original_type = "i32")] - pub(super) counter: Counter, + counter: Counter, } impl DepsEncoding { @@ -183,24 +377,6 @@ impl DepsEncoding { } } -pub fn encode_app_snapshot(app: &LoroApp) -> Vec { - let pre_encoded_state = preprocess_app_state(&app.app_state().lock().unwrap()); - let f = encode_oplog(&app.oplog().lock().unwrap(), Some(pre_encoded_state)); - // f.diagnose_size(); - miniz_oxide::deflate::compress_to_vec(&f.encode(), 6) -} - -pub fn decode_app_snapshot(app: &LoroApp, bytes: &[u8]) -> Result<(), LoroError> { - assert!(app.is_empty()); - let bytes = miniz_oxide::inflate::decompress_to_vec(bytes).unwrap(); - let data = FinalPhase::decode(&bytes)?; - let mut app_state = app.app_state().lock().unwrap(); - decode_state(&mut app_state, &data)?; - let arena = app_state.arena.clone(); - let oplog = decode_oplog(&mut app.oplog().lock().unwrap(), &data, Some(arena))?; - Ok(()) -} - #[derive(Default)] struct PreEncodedState { common: CommonArena<'static>, @@ -336,8 +512,7 @@ fn encode_oplog(oplog: &OpLog, state_ref: Option) -> FinalPhase if common.container_ids.is_empty() { common.container_ids = oplog.arena.export_containers(); } - let mut bytes = CompactBytes::new(); - bytes.append(&arena.text); + let mut bytes = Vec::with_capacity(arena.text.len()); let mut extra_keys = Vec::new(); let mut extra_values = Vec::new(); @@ -375,19 +550,14 @@ fn encode_oplog(oplog: &OpLog, state_ref: Option) -> FinalPhase }; let mut record_str = |s: &[u8], mut pos: usize, container_idx: u32| { - let slices = [bytes.alloc(s)]; // PERF: We can optimize this further - slices - .into_iter() - .map(|range| { - let ans = SnapshotOp::ListInsert { - pos, - start: range.start() as u32, - len: range.len() as u32, - }; - pos += range.len(); - EncodedSnapshotOp::from(ans, container_idx) - }) - .collect::>() + let mut start_idx = bytes.len(); + let slice = bytes.extend_from_slice(s); + let ans = SnapshotOp::ListInsert { + pos, + start: start_idx as u32, + len: s.len() as u32, + }; + EncodedSnapshotOp::from(ans, container_idx) }; // Add all changes @@ -427,7 +597,7 @@ fn encode_oplog(oplog: &OpLog, state_ref: Option) -> FinalPhase let slice = oplog .arena .slice_bytes(slice.0.start as usize..slice.0.end as usize); - encoded_ops.extend(record_str( + encoded_ops.push(record_str( &slice, *pos as usize, op.container.to_index(), @@ -511,8 +681,6 @@ fn encode_oplog(oplog: &OpLog, state_ref: Option) -> FinalPhase } common.peer_ids = Cow::Owned(peers); - let bytes = bytes.take(); - let extra_text = &bytes[arena.text.len()..]; let oplog_encoded = OplogEncoded { changes: encoded_changes, ops: encoded_ops, @@ -527,9 +695,9 @@ fn encode_oplog(oplog: &OpLog, state_ref: Option) -> FinalPhase common: Cow::Owned(common.encode()), app_state: Cow::Owned(app_state.encode()), state_arena: Cow::Owned(arena.encode()), - additional_arena: Cow::Owned( + oplog_extra_arena: Cow::Owned( TempArena { - text: Cow::Borrowed(extra_text), + text: Cow::Borrowed(&bytes), keywords: extra_keys, values: extra_values, } @@ -541,185 +709,6 @@ fn encode_oplog(oplog: &OpLog, state_ref: Option) -> FinalPhase ans } -pub fn decode_oplog( - oplog: &mut OpLog, - data: &FinalPhase, - arena: Option, -) -> Result<(), LoroError> { - let arena = arena.unwrap_or_else(SharedArena::default); - oplog.arena = arena.clone(); - let state_arena = TempArena::decode_state_arena(&data)?; - let mut extra_arena = TempArena::decode_additional_arena(&data)?; - arena.alloc_str_fast(&*state_arena.text); - arena.alloc_str_fast(&*extra_arena.text); - arena.alloc_values(state_arena.values.into_iter()); - arena.alloc_values(extra_arena.values.into_iter()); - let mut keys = state_arena.keywords; - keys.append(&mut extra_arena.keywords); - - let common = CommonArena::decode(&data)?; - let oplog_data = OplogEncoded::decode(data)?; - - let mut changes = Vec::new(); - let mut dep_iter = oplog_data.deps.iter(); - let mut op_iter = oplog_data.ops.iter(); - let mut counters = FxHashMap::default(); - for change in oplog_data.changes.iter() { - let peer_idx = change.peer_idx as usize; - let peer_id = common.peer_ids[peer_idx]; - let timestamp = change.timestamp; - let deps_len = change.deps_len; - let dep_on_self = change.dep_on_self; - let mut ops = RleVec::new(); - let counter_mut = counters.entry(peer_idx).or_insert(0); - let start_counter = *counter_mut; - - // calc ops - let mut total_len = 0; - for _ in 0..change.op_len { - // calc op - let id = ID::new(peer_id, *counter_mut); - let encoded_op = op_iter.next().unwrap(); - let container = common.container_ids[encoded_op.container as usize].clone(); - let container_idx = arena.register_container(&container); - let op = match container.container_type() { - loro_common::ContainerType::Text | loro_common::ContainerType::List => { - let op = encoded_op.get_list(); - match op { - SnapshotOp::ListInsert { start, len, pos } => Op::new( - id, - InnerContent::List(InnerListOp::new_insert(start..start + len, pos)), - container_idx, - ), - SnapshotOp::ListDelete { len, pos } => Op::new( - id, - InnerContent::List(InnerListOp::new_del(pos, len)), - container_idx, - ), - SnapshotOp::ListUnknown { len, pos } => Op::new( - id, - InnerContent::List(InnerListOp::new_unknown(pos, len)), - container_idx, - ), - SnapshotOp::Map { .. } => { - unreachable!() - } - } - } - loro_common::ContainerType::Map => { - let op = encoded_op.get_map(); - match op { - SnapshotOp::Map { - key, - value_idx_plus_one, - } => Op::new( - id, - InnerContent::Map(InnerMapSet { - key: (&*keys[key]).into(), - value: value_idx_plus_one - 1, - }), - container_idx, - ), - _ => unreachable!(), - } - } - }; - *counter_mut += op.content_len() as Counter; - ops.push(op); - } - - // calc deps - let mut deps: smallvec::SmallVec<[ID; 2]> = smallvec![]; - if dep_on_self { - assert!(start_counter > 0); - deps.push(ID::new(peer_id, start_counter - 1)); - } - - for _ in 0..deps_len { - let dep = dep_iter.next().unwrap(); - let peer = common.peer_ids[dep.peer_idx as usize]; - deps.push(ID::new(peer, dep.counter)); - } - - changes.push(Change { - deps: Frontiers::from(deps), - ops, - timestamp, - id: ID::new(peer_id, start_counter), - lamport: 0, // calculate lamport when importing - }); - } - - // we assume changes are already sorted by lamport already - for mut change in changes { - let lamport = oplog.dag.frontiers_to_next_lamport(&change.deps); - change.lamport = lamport; - oplog.import_local_change(change)?; - } - - Ok(()) -} - -pub fn decode_state(app_state: &mut AppState, data: &FinalPhase) -> Result<(), LoroError> { - assert!(app_state.is_empty()); - assert!(!app_state.is_in_txn()); - let arena = app_state.arena.clone(); - let common = CommonArena::decode(&data)?; - let state_arena = TempArena::decode_state_arena(&data)?; - let encoded_app_state = EncodedAppState::decode(&data)?; - app_state.frontiers = Frontiers::from(&encoded_app_state.frontiers); - let mut text_index = 0; - // this part should be moved to encode.rs in preload - for ((id, parent), state) in common - .container_ids - .iter() - .zip(encoded_app_state.parents.iter()) - .zip(encoded_app_state.states.iter()) - { - let idx = arena.register_container(id); - let parent_idx = - (*parent).map(|x| ContainerIdx::from_index_and_type(x, state.container_type())); - arena.set_parent(idx, parent_idx); - match state { - loro_preload::EncodedContainerState::Text { len } => { - let index = text_index; - app_state.set_state( - idx, - State::TextState(TextState::from_str( - std::str::from_utf8(&state_arena.text[index..index + len]).unwrap(), - )), - ); - text_index += len; - } - loro_preload::EncodedContainerState::Map(map_data) => { - let mut map = MapState::new(); - for entry in map_data.iter() { - map.insert( - InternalString::from(&*state_arena.keywords[entry.key]), - MapValue { - counter: entry.counter as Counter, - value: if entry.value == 0 { - None - } else { - Some(state_arena.values[entry.value as usize - 1].clone()) - }, - lamport: (entry.lamport, common.peer_ids[entry.peer as usize]), - }, - ) - } - app_state.set_state(idx, State::MapState(map)); - } - loro_preload::EncodedContainerState::List(list_data) => { - let mut list = ListState::new(); - list.insert_batch(0, list_data.iter().map(|&x| state_arena.values[x].clone())); - app_state.set_state(idx, State::ListState(list)); - } - } - } - - Ok(()) -} - #[cfg(test)] mod test { use super::*; @@ -732,7 +721,7 @@ mod test { common: Cow::Owned(vec![0, 1, 2, 253, 254, 255]), app_state: Cow::Owned(vec![255]), state_arena: Cow::Owned(vec![255]), - additional_arena: Cow::Owned(vec![255]), + oplog_extra_arena: Cow::Owned(vec![255]), oplog: Cow::Owned(vec![255]), } .encode()); diff --git a/crates/loro-preload/src/encode.rs b/crates/loro-preload/src/encode.rs index 63dedb15..3a4f60c2 100644 --- a/crates/loro-preload/src/encode.rs +++ b/crates/loro-preload/src/encode.rs @@ -14,7 +14,7 @@ pub struct FinalPhase<'a> { #[serde(borrow)] pub state_arena: Cow<'a, [u8]>, // -> TempArena<'a> #[serde(borrow)] - pub additional_arena: Cow<'a, [u8]>, // -> TempArena<'a>,抛弃这部分则不能回溯历史 + pub oplog_extra_arena: Cow<'a, [u8]>, // -> TempArena<'a>,抛弃这部分则不能回溯历史 #[serde(borrow)] pub oplog: Cow<'a, [u8]>, // -> OpLog. Can be ignored if we only need state } @@ -26,7 +26,7 @@ impl<'a> FinalPhase<'a> { self.common.len() + self.app_state.len() + self.state_arena.len() - + self.additional_arena.len() + + self.oplog_extra_arena.len() + self.oplog.len() + 10, ); @@ -37,8 +37,8 @@ impl<'a> FinalPhase<'a> { bytes.put_slice(&self.app_state); leb::write_unsigned(&mut bytes, self.state_arena.len() as u64); bytes.put_slice(&self.state_arena); - leb::write_unsigned(&mut bytes, self.additional_arena.len() as u64); - bytes.put_slice(&self.additional_arena); + leb::write_unsigned(&mut bytes, self.oplog_extra_arena.len() as u64); + bytes.put_slice(&self.oplog_extra_arena); leb::write_unsigned(&mut bytes, self.oplog.len() as u64); bytes.put_slice(&self.oplog); bytes.to_vec() @@ -70,7 +70,7 @@ impl<'a> FinalPhase<'a> { common: Cow::Borrowed(common), app_state: Cow::Borrowed(app_state), state_arena: Cow::Borrowed(state_arena), - additional_arena: Cow::Borrowed(additional_arena), + oplog_extra_arena: Cow::Borrowed(additional_arena), oplog: Cow::Borrowed(oplog), }) } @@ -79,7 +79,7 @@ impl<'a> FinalPhase<'a> { println!("common: {}", self.common.len()); println!("app_state: {}", self.app_state.len()); println!("state_arena: {}", self.state_arena.len()); - println!("additional_arena: {}", self.additional_arena.len()); + println!("additional_arena: {}", self.oplog_extra_arena.len()); println!("oplog: {}", self.oplog.len()); } } @@ -167,7 +167,7 @@ impl<'a> TempArena<'a> { } pub fn decode_additional_arena(data: &'a FinalPhase) -> Result { - serde_columnar::from_bytes(&data.additional_arena) + serde_columnar::from_bytes(&data.oplog_extra_arena) .map_err(|e| LoroError::DecodeError(e.to_string().into_boxed_str())) } }