From f6cc5da0f1f1a83aa462175a534812e341bf0250 Mon Sep 17 00:00:00 2001 From: Leon Zhao Date: Mon, 22 Jan 2024 12:19:09 +0800 Subject: [PATCH] refactor: Optimizing Encoding Representation for Child Container Creation to Reduce Document Size (#247) * refactor: encoding container id * fix: container indexing when merged ops in encoding * chore: add compress encode size for draw example * fix: do not need cids in encoding * chore: change name containerIdx to containerType in encoding --- Cargo.lock | 1 + crates/examples/Cargo.toml | 1 + crates/examples/examples/draw.rs | 19 ++ .../src/encoding/encode_reordered.rs | 228 +++++++----------- .../src/fuzz/recursive_refactored.rs | 46 ++++ 5 files changed, 151 insertions(+), 144 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 305ec938..e2c68b85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -509,6 +509,7 @@ dependencies = [ "color-backtrace 0.6.1", "ctor 0.2.6", "debug-log", + "flate2", "loro", "serde_json", "tabled 0.15.0", diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml index eb559b06..57c26347 100644 --- a/crates/examples/Cargo.toml +++ b/crates/examples/Cargo.toml @@ -14,5 +14,6 @@ debug-log = { version = "0.3.1", features = [] } serde_json = "1.0.111" [dev-dependencies] +flate2 = "1.0" color-backtrace = { version = "0.6" } ctor = "0.2" diff --git a/crates/examples/examples/draw.rs b/crates/examples/examples/draw.rs index d38a947b..b96be0fb 100644 --- a/crates/examples/examples/draw.rs +++ b/crates/examples/examples/draw.rs @@ -2,7 +2,10 @@ use std::time::Instant; use bench_utils::SyncKind; use examples::{draw::DrawActor, run_async_workflow, run_realtime_collab_workflow}; +use flate2::write::GzEncoder; +use flate2::Compression; use loro::{LoroDoc, ToJson}; +use std::io::prelude::*; use tabled::{settings::Style, Table, Tabled}; #[derive(Tabled)] @@ -13,7 +16,9 @@ struct BenchResult { ops_num: usize, changes_num: usize, snapshot_size: usize, + compressed_snapshot_size: usize, updates_size: usize, + compressed_updates_size: usize, apply_duration: f64, encode_snapshot_duration: f64, encode_udpate_duration: f64, @@ -22,6 +27,12 @@ struct BenchResult { doc_json_size: usize, } +fn compress(data: &[u8]) -> Vec { + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(data).expect("Failed to write data"); + encoder.finish().expect("Failed to compress data") +} + pub fn main() { let seed = 123123; let ans = vec![ @@ -68,11 +79,13 @@ fn run_async(peer_num: usize, action_num: usize, seed: u64) -> BenchResult { let snapshot = actors.docs[0].doc.export_snapshot(); let encode_snapshot_duration = start.elapsed().as_secs_f64() * 1000.; let snapshot_size = snapshot.len(); + let compressed_snapshot_size = compress(&snapshot).len(); let start = Instant::now(); let updates = actors.docs[0].doc.export_from(&Default::default()); let encode_udpate_duration = start.elapsed().as_secs_f64() * 1000.; let updates_size = updates.len(); + let compressed_updates_size = compress(&updates).len(); let start = Instant::now(); let doc = LoroDoc::new(); @@ -90,9 +103,11 @@ fn run_async(peer_num: usize, action_num: usize, seed: u64) -> BenchResult { action_size: action_num, peer_num, snapshot_size, + compressed_snapshot_size, ops_num: actors.docs[0].doc.len_ops(), changes_num: actors.docs[0].doc.len_changes(), updates_size, + compressed_updates_size, apply_duration, encode_snapshot_duration, encode_udpate_duration, @@ -120,11 +135,13 @@ fn run_realtime_collab(peer_num: usize, action_num: usize, seed: u64) -> BenchRe let snapshot = actors.docs[0].doc.export_snapshot(); let encode_snapshot_duration = start.elapsed().as_secs_f64() * 1000.; let snapshot_size = snapshot.len(); + let compressed_snapshot_size = compress(&snapshot).len(); let start = Instant::now(); let updates = actors.docs[0].doc.export_from(&Default::default()); let encode_udpate_duration = start.elapsed().as_secs_f64() * 1000.; let updates_size = updates.len(); + let compressed_updates_size = compress(&updates).len(); let start = Instant::now(); let doc = LoroDoc::new(); @@ -145,7 +162,9 @@ fn run_realtime_collab(peer_num: usize, action_num: usize, seed: u64) -> BenchRe ops_num: actors.docs[0].doc.len_ops(), changes_num: actors.docs[0].doc.len_changes(), snapshot_size, + compressed_snapshot_size, updates_size, + compressed_updates_size, apply_duration, encode_snapshot_duration, encode_udpate_duration, diff --git a/crates/loro-internal/src/encoding/encode_reordered.rs b/crates/loro-internal/src/encoding/encode_reordered.rs index 07be0820..bc0ec012 100644 --- a/crates/loro-internal/src/encoding/encode_reordered.rs +++ b/crates/loro-internal/src/encoding/encode_reordered.rs @@ -290,7 +290,7 @@ fn extract_ops( { return Err(LoroError::DecodeDataCorruptionError); } - + let peer = peer_ids.peer_ids[peer_idx as usize]; let cid = &containers[container_index as usize]; let c_idx = arena.register_container(cid); let kind = ValueKind::from_u8(value_type).expect("Unknown value type"); @@ -302,10 +302,9 @@ fn extract_ops( prop, keys, &peer_ids.peer_ids, - &containers, + ID::new(peer, counter), )?; - let peer = peer_ids.peer_ids[peer_idx as usize]; let op = Op { counter, container: c_idx, @@ -1014,7 +1013,7 @@ fn decode_op( prop: i32, keys: &arena::KeyArena, peers: &[u64], - cids: &[ContainerID], + id: ID, ) -> LoroResult { let content = match cid.container_type() { ContainerType::Text => match kind { @@ -1037,7 +1036,7 @@ fn decode_op( )) } ValueKind::MarkStart => { - let mark = value_reader.read_mark(&keys.keys, cids)?; + let mark = value_reader.read_mark(&keys.keys, id)?; let key = keys .keys .get(mark.key_idx as usize) @@ -1069,7 +1068,7 @@ fn decode_op( crate::op::InnerContent::Map(crate::container::map::MapSet { key, value: None }) } kind => { - let value = value_reader.read_value_content(kind, &keys.keys, cids)?; + let value = value_reader.read_value_content(kind, &keys.keys, id)?; crate::op::InnerContent::Map(crate::container::map::MapSet { key, value: Some(value), @@ -1081,8 +1080,7 @@ fn decode_op( let pos = prop as usize; match kind { ValueKind::Array => { - let arr = - value_reader.read_value_content(ValueKind::Array, &keys.keys, cids)?; + let arr = value_reader.read_value_content(ValueKind::Array, &keys.keys, id)?; let range = arena.alloc_values( Arc::try_unwrap( arr.into_list() @@ -1261,7 +1259,8 @@ mod value { use fxhash::FxHashMap; use loro_common::{ - ContainerID, Counter, InternalString, LoroError, LoroResult, LoroValue, PeerID, TreeID, + ContainerID, ContainerType, Counter, InternalString, LoroError, LoroResult, LoroValue, + PeerID, TreeID, ID, }; use super::{encode::ValueRegister, MAX_COLLECTION_SIZE}; @@ -1344,7 +1343,7 @@ mod value { False = 2, DeleteOnce = 3, I32 = 4, - ContainerIdx = 5, + ContainerType = 5, F64 = 6, Str = 7, DeleteSeq = 8, @@ -1371,8 +1370,8 @@ mod value { Some(ValueKind::DeleteOnce) } else if n == ValueKind::I32 as u8 { Some(ValueKind::I32) - } else if n == ValueKind::ContainerIdx as u8 { - Some(ValueKind::ContainerIdx) + } else if n == ValueKind::ContainerType as u8 { + Some(ValueKind::ContainerType) } else if n == ValueKind::F64 as u8 { Some(ValueKind::F64) } else if n == ValueKind::Str as u8 { @@ -1417,7 +1416,7 @@ mod value { ValueKind::False => ValueKind::False as i64, ValueKind::DeleteOnce => ValueKind::DeleteOnce as i64, ValueKind::I32 => ValueKind::I32 as i64, - ValueKind::ContainerIdx => ValueKind::ContainerIdx as i64, + ValueKind::ContainerType => ValueKind::ContainerType as i64, ValueKind::F64 => ValueKind::F64 as i64, ValueKind::Str => ValueKind::Str as i64, ValueKind::DeleteSeq => ValueKind::DeleteSeq as i64, @@ -1444,7 +1443,7 @@ mod value { ValueKind::False => ValueKind::False as u8, ValueKind::DeleteOnce => ValueKind::DeleteOnce as u8, ValueKind::I32 => ValueKind::I32 as u8, - ValueKind::ContainerIdx => ValueKind::ContainerIdx as u8, + ValueKind::ContainerType => ValueKind::ContainerType as u8, ValueKind::F64 => ValueKind::F64 as u8, ValueKind::Str => ValueKind::Str as u8, ValueKind::DeleteSeq => ValueKind::DeleteSeq as u8, @@ -1467,7 +1466,7 @@ mod value { Value::False => ValueKind::False, Value::DeleteOnce => ValueKind::DeleteOnce, Value::I32(_) => ValueKind::I32, - Value::ContainerIdx(_) => ValueKind::ContainerIdx, + Value::ContainerIdx(_) => ValueKind::ContainerType, Value::F64(_) => ValueKind::F64, Value::Str(_) => ValueKind::Str, Value::DeleteSeq(_) => ValueKind::DeleteSeq, @@ -1493,7 +1492,7 @@ mod value { LoroValue::List(_) => ValueKind::Array, LoroValue::Map(_) => ValueKind::Map, LoroValue::Binary(_) => ValueKind::Binary, - LoroValue::Container(_) => ValueKind::ContainerIdx, + LoroValue::Container(_) => ValueKind::ContainerType, } } @@ -1559,9 +1558,8 @@ mod value { ValueKind::Binary } LoroValue::Container(c) => { - let idx = register_cid.register(c); - self.write_usize(idx); - ValueKind::ContainerIdx + self.write_u8(c.container_type().to_u8()); + ValueKind::ContainerType } } } @@ -1688,50 +1686,16 @@ mod value { ValueReader { raw } } - #[allow(unused)] - pub fn read( - &mut self, - kind: u8, - keys: &[InternalString], - cids: &[ContainerID], - ) -> LoroResult> { - let Some(kind) = ValueKind::from_u8(kind) else { - return Ok(Value::Unknown { - kind, - data: self.read_binary()?, - }); - }; - - Ok(match kind { - ValueKind::Null => Value::Null, - ValueKind::True => Value::True, - ValueKind::False => Value::False, - ValueKind::DeleteOnce => Value::DeleteOnce, - ValueKind::I32 => Value::I32(self.read_i32()?), - ValueKind::F64 => Value::F64(self.read_f64()?), - ValueKind::Str => Value::Str(self.read_str()?), - ValueKind::DeleteSeq => Value::DeleteSeq(self.read_i32()?), - ValueKind::DeltaInt => Value::DeltaInt(self.read_i32()?), - ValueKind::Array => Value::Array(self.read_array(keys, cids)?), - ValueKind::Map => Value::Map(self.read_map(keys, cids)?), - ValueKind::Binary => Value::Binary(self.read_binary()?), - ValueKind::MarkStart => Value::MarkStart(self.read_mark(keys, cids)?), - ValueKind::TreeMove => Value::TreeMove(self.read_tree_move()?), - ValueKind::ContainerIdx => Value::ContainerIdx(self.read_usize()?), - ValueKind::Unknown => unreachable!(), - }) - } - pub fn read_value_type_and_content( &mut self, keys: &[InternalString], - cids: &[ContainerID], + id: ID, ) -> LoroResult { let kind = self.read_u8()?; self.read_value_content( ValueKind::from_u8(kind).expect("Unknown value type"), keys, - cids, + id, ) } @@ -1739,7 +1703,7 @@ mod value { &mut self, kind: ValueKind, keys: &[InternalString], - cids: &[ContainerID], + id: ID, ) -> LoroResult { Ok(match kind { ValueKind::Null => LoroValue::Null, @@ -1754,10 +1718,11 @@ mod value { if len > MAX_COLLECTION_SIZE { return Err(LoroError::DecodeDataCorruptionError); } - let mut ans = Vec::with_capacity(len); - for _ in 0..len { - ans.push(self.recursive_read_value_type_and_content(keys, cids)?); + for i in 0..len { + ans.push( + self.recursive_read_value_type_and_content(keys, id.inc(i as i32))?, + ); } ans.into() } @@ -1766,7 +1731,6 @@ mod value { if len > MAX_COLLECTION_SIZE { return Err(LoroError::DecodeDataCorruptionError); } - let mut ans = FxHashMap::with_capacity_and_hasher(len, Default::default()); for _ in 0..len { let key_idx = self.read_usize()?; @@ -1774,17 +1738,18 @@ mod value { .get(key_idx) .ok_or(LoroError::DecodeDataCorruptionError)? .to_string(); - let value = self.recursive_read_value_type_and_content(keys, cids)?; + let value = self.recursive_read_value_type_and_content(keys, id)?; ans.insert(key, value); } ans.into() } ValueKind::Binary => LoroValue::Binary(Arc::new(self.read_binary()?.to_owned())), - ValueKind::ContainerIdx => LoroValue::Container( - cids.get(self.read_usize()?) - .ok_or(LoroError::DecodeDataCorruptionError)? - .clone(), - ), + ValueKind::ContainerType => { + let container_id = + ContainerID::new_normal(id, ContainerType::from_u8(self.read_u8()?)); + + LoroValue::Container(container_id) + } a => unreachable!("Unexpected value kind {:?}", a), }) } @@ -1796,7 +1761,7 @@ mod value { fn recursive_read_value_type_and_content( &mut self, keys: &[InternalString], - cids: &[ContainerID], + id: ID, ) -> LoroResult { #[derive(Debug)] enum Task { @@ -1893,11 +1858,13 @@ mod value { ValueKind::Binary => { LoroValue::Binary(Arc::new(self.read_binary()?.to_owned())) } - ValueKind::ContainerIdx => LoroValue::Container( - cids.get(self.read_usize()?) - .ok_or(LoroError::DecodeDataCorruptionError)? - .clone(), - ), + ValueKind::ContainerType => { + let container_id = ContainerID::new_normal( + id, + ContainerType::from_u8(self.read_u8()?), + ); + LoroValue::Container(container_id) + } a => unreachable!("Unexpected value kind {:?}", a), }; @@ -2020,48 +1987,6 @@ mod value { Ok(ans) } - fn read_array( - &mut self, - keys: &[InternalString], - cids: &[ContainerID], - ) -> LoroResult>> { - let len = self.read_usize()?; - if len > MAX_COLLECTION_SIZE { - return Err(LoroError::DecodeDataCorruptionError); - } - - let mut ans = Vec::with_capacity(len); - for _ in 0..len { - let kind = self.read_u8()?; - ans.push(self.read(kind, keys, cids)?); - } - Ok(ans) - } - - fn read_map( - &mut self, - keys: &[InternalString], - cids: &[ContainerID], - ) -> LoroResult>> { - let len = self.read_usize()?; - if len > MAX_COLLECTION_SIZE { - return Err(LoroError::DecodeDataCorruptionError); - } - - let mut ans = FxHashMap::with_capacity_and_hasher(len, Default::default()); - for _ in 0..len { - let key_idx = self.read_usize()?; - let key = keys - .get(key_idx) - .ok_or(LoroError::DecodeDataCorruptionError)? - .clone(); - let kind = self.read_u8()?; - let value = self.read(kind, keys, cids)?; - ans.insert(key, value); - } - Ok(ans) - } - fn read_binary(&mut self) -> LoroResult<&'a [u8]> { let len = self.read_usize()?; if self.raw.len() < len { @@ -2073,15 +1998,11 @@ mod value { Ok(ans) } - pub fn read_mark( - &mut self, - keys: &[InternalString], - cids: &[ContainerID], - ) -> LoroResult { + pub fn read_mark(&mut self, keys: &[InternalString], id: ID) -> LoroResult { let info = self.read_u8()?; let len = self.read_usize()?; let key_idx = self.read_usize()?; - let value = self.read_value_type_and_content(keys, cids)?; + let value = self.read_value_type_and_content(keys, id)?; Ok(MarkStart { len: len as u32, key_idx: key_idx as u32, @@ -2421,43 +2342,62 @@ mod test { use super::*; - fn test_loro_value_read_write(v: impl Into) { + fn test_loro_value_read_write(v: impl Into, container_id: Option) { let v = v.into(); let mut key_reg: ValueRegister = ValueRegister::new(); let mut cid_reg: ValueRegister = ValueRegister::new(); + let id = match &container_id { + Some(ContainerID::Root { .. }) => ID::new(u64::MAX, 0), + Some(ContainerID::Normal { peer, counter, .. }) => ID::new(*peer, *counter), + None => ID::new(u64::MAX, 0), + }; + let mut writer = ValueWriter::new(); let kind = writer.write_value_content(&v, &mut key_reg, &mut cid_reg); let binding = writer.finish(); let mut reader = ValueReader::new(binding.as_slice()); let keys = &key_reg.unwrap_vec(); - let cids = &cid_reg.unwrap_vec(); - let ans = reader.read_value_content(kind, keys, cids).unwrap(); + + let ans = reader.read_value_content(kind, keys, id).unwrap(); assert_eq!(v, ans) } #[test] fn test_value_read_write() { - test_loro_value_read_write(true); - test_loro_value_read_write(false); - test_loro_value_read_write(123); - test_loro_value_read_write(1.23); - test_loro_value_read_write(LoroValue::Null); - test_loro_value_read_write(LoroValue::Binary(Arc::new(vec![123, 223, 255, 0, 1, 2, 3]))); - test_loro_value_read_write("sldk;ajfas;dlkfas测试"); - test_loro_value_read_write(LoroValue::Container(ContainerID::new_root( - "name", - ContainerType::Text, - ))); - test_loro_value_read_write(LoroValue::Container(ContainerID::new_normal( - ID::new(u64::MAX, 123), - ContainerType::Tree, - ))); - test_loro_value_read_write(vec![1i32, 2, 3]); - test_loro_value_read_write(LoroValue::Map(Arc::new(fx_map![ - "1".into() => 123.into(), - "2".into() => "123".into(), - "3".into() => vec![true].into() - ]))); + test_loro_value_read_write(true, None); + test_loro_value_read_write(false, None); + test_loro_value_read_write(123, None); + test_loro_value_read_write(1.23, None); + test_loro_value_read_write(LoroValue::Null, None); + test_loro_value_read_write( + LoroValue::Binary(Arc::new(vec![123, 223, 255, 0, 1, 2, 3])), + None, + ); + test_loro_value_read_write("sldk;ajfas;dlkfas测试", None); + // we won't encode root container by `value content` + // test_loro_value_read_write( + // LoroValue::Container(ContainerID::new_root("name", ContainerType::Text)), + // Some(ContainerID::new_root("name", ContainerType::Text)), + // ); + test_loro_value_read_write( + LoroValue::Container(ContainerID::new_normal( + ID::new(u64::MAX, 123), + ContainerType::Tree, + )), + Some(ContainerID::new_normal( + ID::new(u64::MAX, 123), + ContainerType::Tree, + )), + ); + test_loro_value_read_write(vec![1i32, 2, 3], None); + test_loro_value_read_write( + LoroValue::Map(Arc::new(fx_map![ + "1".into() => 123.into(), + "2".into() => "123".into(), + "3".into() => vec![true].into() + ])), + None, + ); } } diff --git a/crates/loro-internal/src/fuzz/recursive_refactored.rs b/crates/loro-internal/src/fuzz/recursive_refactored.rs index ba408671..8fc30b39 100644 --- a/crates/loro-internal/src/fuzz/recursive_refactored.rs +++ b/crates/loro-internal/src/fuzz/recursive_refactored.rs @@ -1207,6 +1207,52 @@ mod failed_tests { ) } + #[test] + fn encoding_sub_container() { + test_multi_sites( + 5, + &mut [ + List { + site: 96, + container_idx: 96, + key: 96, + value: Container(C::Tree), + }, + List { + site: 96, + container_idx: 96, + key: 96, + value: Container(C::List), + }, + List { + site: 90, + container_idx: 96, + key: 96, + value: I32(1516265568), + }, + List { + site: 96, + container_idx: 96, + key: 7, + value: Container(C::Map), + }, + SyncAll, + Map { + site: 4, + container_idx: 21, + key: 64, + value: I32(-13828256), + }, + List { + site: 45, + container_idx: 89, + key: 235, + value: I32(2122219134), + }, + ], + ) + } + #[test] fn notify_causal_order_check() { test_multi_sites(