diff --git a/.vscode/settings.json b/.vscode/settings.json index 25d9f828..22f4fd2a 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -2,6 +2,7 @@ "cSpell.words": [ "arbtest", "bolds", + "bools", "cids", "clippy", "collab", diff --git a/crates/loro-common/src/lib.rs b/crates/loro-common/src/lib.rs index 92acc984..3141e4dd 100644 --- a/crates/loro-common/src/lib.rs +++ b/crates/loro-common/src/lib.rs @@ -1,4 +1,4 @@ -use std::{fmt::Display, io::Write, sync::Arc}; +use std::{fmt::Display, io::Write, str::Bytes, sync::Arc}; use arbitrary::Arbitrary; use enum_as_inner::EnumAsInner; @@ -243,7 +243,8 @@ impl ContainerID { name, container_type, } => { - writer.write_all(&[0, container_type.to_u8()])?; + let first_byte = container_type.to_u8() | 0b10000000; + writer.write_all(&[first_byte])?; leb128::write::unsigned(writer, name.len() as u64)?; writer.write_all(name.as_bytes())?; } @@ -252,14 +253,50 @@ impl ContainerID { counter, container_type, } => { - writer.write_all(&[1, container_type.to_u8()])?; + let first_byte = container_type.to_u8(); + writer.write_all(&[first_byte])?; writer.write_all(&peer.to_le_bytes())?; - leb128::write::unsigned(writer, *counter as u64)?; + writer.write_all(&counter.to_le_bytes())?; } } Ok(()) } + + pub fn to_bytes(&self) -> Vec { + let mut bytes = Vec::new(); + self.encode(&mut bytes).unwrap(); + bytes + } + + pub fn from_bytes(bytes: &[u8]) -> Self { + let first_byte = bytes[0]; + let container_type = ContainerType::try_from_u8(first_byte & 0b01111111).unwrap(); + let is_root = (first_byte & 0b10000000) != 0; + + let mut reader = &bytes[1..]; + match is_root { + true => { + let name_len = leb128::read::unsigned(&mut reader).unwrap(); + let name = InternalString::from( + std::str::from_utf8(&reader[..name_len as usize]).unwrap(), + ); + Self::Root { + name, + container_type, + } + } + false => { + let peer = PeerID::from_le_bytes(reader[..8].try_into().unwrap()); + let counter = i32::from_le_bytes(reader[8..12].try_into().unwrap()); + Self::Normal { + peer, + counter, + container_type, + } + } + } + } } impl std::fmt::Debug for ContainerID { @@ -352,9 +389,7 @@ impl ContainerType { 4 => Ok(ContainerType::MovableList), #[cfg(feature = "counter")] 5 => Ok(ContainerType::Counter), - _ => Err(LoroError::DecodeError( - format!("Unknown container type {v}").into_boxed_str(), - )), + x => Ok(ContainerType::Unknown(x)), } } } @@ -617,7 +652,7 @@ pub mod wasm { #[cfg(test)] mod test { - use crate::ContainerID; + use crate::{ContainerID, ContainerType, ID}; #[test] fn test_container_id_convert_to_and_from_str() { @@ -654,4 +689,38 @@ mod test { assert!(ContainerID::try_from("id:0@0:Map").is_err()); assert!(ContainerID::try_from("cid:0@0:Unknown(6)").is_ok()); } + + #[test] + fn test_container_id_encode_and_decode() { + let id = ContainerID::new_normal(ID::new(1, 2), ContainerType::Map); + let bytes = id.to_bytes(); + assert_eq!(ContainerID::from_bytes(&bytes), id); + + let id = ContainerID::new_normal(ID::new(u64::MAX, i32::MAX), ContainerType::Text); + let bytes = id.to_bytes(); + assert_eq!(ContainerID::from_bytes(&bytes), id); + + let id = ContainerID::new_root("test_root", ContainerType::List); + let bytes = id.to_bytes(); + assert_eq!(ContainerID::from_bytes(&bytes), id); + + let id = ContainerID::new_normal(ID::new(0, 0), ContainerType::MovableList); + let bytes = id.to_bytes(); + assert_eq!(ContainerID::from_bytes(&bytes), id); + + let id = ContainerID::new_root(&"x".repeat(1024), ContainerType::Tree); + let bytes = id.to_bytes(); + assert_eq!(ContainerID::from_bytes(&bytes), id); + + #[cfg(feature = "counter")] + { + let id = ContainerID::new_normal(ID::new(42, 100), ContainerType::Counter); + let bytes = id.to_bytes(); + assert_eq!(ContainerID::from_bytes(&bytes), id); + } + + let id = ContainerID::new_normal(ID::new(1, 1), ContainerType::Unknown(100)); + let bytes = id.to_bytes(); + assert_eq!(ContainerID::from_bytes(&bytes), id); + } } diff --git a/crates/loro-internal/src/state.rs b/crates/loro-internal/src/state.rs index abfcfbbe..3a562158 100644 --- a/crates/loro-internal/src/state.rs +++ b/crates/loro-internal/src/state.rs @@ -706,10 +706,10 @@ impl DocState { self.store.iter_and_decode_all() } - pub(crate) fn iter_wrapper_mut( + pub(crate) fn iter_all_containers_mut( &mut self, ) -> impl Iterator { - self.store.iter_mut() + self.store.iter_all_containers() } pub(crate) fn init_container( @@ -792,7 +792,7 @@ impl DocState { if self.is_recording() { let diff: Vec<_> = self .store - .iter_mut() + .iter_all_containers() .map(|(&idx, state)| InternalContainerDiff { idx, bring_back: false, diff --git a/crates/loro-internal/src/state/analyzer.rs b/crates/loro-internal/src/state/analyzer.rs index f4ec57a8..52b6a526 100644 --- a/crates/loro-internal/src/state/analyzer.rs +++ b/crates/loro-internal/src/state/analyzer.rs @@ -40,7 +40,7 @@ impl DocAnalysis { let mut containers = FxHashMap::default(); let mut state = doc.app_state().lock().unwrap(); let alive_containers = state.get_all_alive_containers(); - for (&idx, c) in state.iter_wrapper_mut() { + for (&idx, c) in state.iter_all_containers_mut() { let ops_num = ops_nums.get(&idx).unwrap_or(&0); let id = doc.arena().get_container_id(idx).unwrap(); let dropped = !alive_containers.contains(&id); diff --git a/crates/loro-internal/src/state/container_store.rs b/crates/loro-internal/src/state/container_store.rs index 79c97432..e0f8d75d 100644 --- a/crates/loro-internal/src/state/container_store.rs +++ b/crates/loro-internal/src/state/container_store.rs @@ -1,8 +1,6 @@ -use std::{ - cmp::Ordering, - sync::{atomic::AtomicU64, Arc}, -}; - +#[cfg(feature = "counter")] +use super::counter_state::CounterState; +use super::{ContainerCreationContext, MovableListState, State, TreeState}; use crate::{ arena::SharedArena, configure::Configure, @@ -10,17 +8,15 @@ use crate::{ state::{FastStateSnapshot, RichtextState}, }; use bytes::Bytes; -use encode::{decode_cids, CidOffsetEncoder}; use fxhash::FxHashMap; +use inner_store::InnerStore; use loro_common::{ContainerID, ContainerType, LoroResult, LoroValue}; +use std::sync::{atomic::AtomicU64, Arc}; -use super::{ - unknown_state::UnknownState, ContainerCreationContext, ContainerState, ListState, MapState, - MovableListState, State, TreeState, -}; +pub(crate) use container_wrapper::ContainerWrapper; -#[cfg(feature = "counter")] -use super::counter_state::CounterState; +mod container_wrapper; +mod inner_store; /// Encoding Schema for Container Store /// @@ -55,10 +51,9 @@ use super::counter_state::CounterState; /// │ │ /// │ │ /// └───────────────────────────────────────────────────┘ -#[derive(Clone)] pub(crate) struct ContainerStore { arena: SharedArena, - store: FxHashMap, + store: InnerStore, conf: Configure, peer: Arc, } @@ -83,120 +78,42 @@ macro_rules! ctx { impl ContainerStore { pub fn new(arena: SharedArena, conf: Configure, peer: Arc) -> Self { ContainerStore { + store: InnerStore::new(arena.clone()), arena, - store: Default::default(), conf, peer, } } pub fn get_container_mut(&mut self, idx: ContainerIdx) -> Option<&mut State> { - self.store.get_mut(&idx).map(|x| { - x.get_state_mut( - idx, - ContainerCreationContext { - configure: &self.conf, - peer: self.peer.load(std::sync::atomic::Ordering::Relaxed), - }, - ) - }) + self.store + .get_mut(idx) + .map(|x| x.get_state_mut(idx, ctx!(self))) } #[allow(unused)] pub fn get_container(&mut self, idx: ContainerIdx) -> Option<&State> { - self.store.get_mut(&idx).map(|x| { - x.get_state( - idx, - ContainerCreationContext { - configure: &self.conf, - peer: self.peer.load(std::sync::atomic::Ordering::Relaxed), - }, - ) - }) + self.store + .get_mut(idx) + .map(|x| x.get_state(idx, ctx!(self))) } pub fn get_value(&mut self, idx: ContainerIdx) -> Option { - self.store.get_mut(&idx).map(|c| { - c.get_value( - idx, - ContainerCreationContext { - configure: &self.conf, - peer: self.peer.load(std::sync::atomic::Ordering::Relaxed), - }, - ) - }) + self.store + .get_mut(idx) + .map(|c| c.get_value(idx, ctx!(self))) } pub fn encode(&mut self) -> Bytes { - let mut id_bytes_pairs = Vec::with_capacity(self.store.len()); - for (idx, container) in self.store.iter_mut() { - let id = self.arena.get_container_id(*idx).unwrap(); - let encoded = container.encode(); - #[cfg(debug_assertions)] - { - let mut new = ContainerWrapper::new_from_bytes(encoded.clone()); - let value = new.get_value(*idx, ctx!(self)); - assert_eq!(value, container.get_value(*idx, ctx!(self))); - } - - id_bytes_pairs.push((id, encoded)) - } - - id_bytes_pairs.sort_by(|(a, _), (b, _)| a.cmp(b)); - let mut id_encoder = CidOffsetEncoder::new(); - let mut offset = 0; - for (id, bytes) in id_bytes_pairs.iter() { - id_encoder.push(id, offset); - offset += bytes.len(); - } - - let mut bytes = Vec::with_capacity(self.store.len() * 4 + 4); - // prepend 4 zeros - bytes.resize(4, 0); - // append the encoded cids - id_encoder.write_to_io(&mut bytes); - // set the first 4 bytes of bytes to the length of itself - let len = bytes.len() as u32; - bytes[0] = (len & 0xff) as u8; - bytes[1] = ((len >> 8) & 0xff) as u8; - bytes[2] = ((len >> 16) & 0xff) as u8; - bytes[3] = ((len >> 24) & 0xff) as u8; - for (_id, b) in id_bytes_pairs.iter() { - bytes.extend_from_slice(b); - } - - bytes.into() + self.store.encode() } pub fn decode(&mut self, bytes: Bytes) -> LoroResult<()> { - assert!(self.is_empty()); - let offset = u32::from_le_bytes((&bytes[..4]).try_into().unwrap()) as usize; - let cids = &bytes[4..offset]; - let cids = decode_cids(cids)?; - let container_bytes = bytes.slice(offset..); - for (i, (cid, offset_start)) in cids.iter().enumerate() { - let offset_end = if i < cids.len() - 1 { - cids[i + 1].1 - } else { - container_bytes.len() - }; - - let container = - ContainerWrapper::new_from_bytes(container_bytes.slice(*offset_start..offset_end)); - let idx = self.arena.register_container(cid); - let p = container - .parent - .as_ref() - .map(|p| self.arena.register_container(p)); - self.arena.set_parent(idx, p); - self.store.insert(idx, container); - } - - Ok(()) + self.store.decode(bytes) } pub fn iter_and_decode_all(&mut self) -> impl Iterator { - self.store.iter_mut().map(|(idx, v)| { + self.store.iter_all_containers_mut().map(|(idx, v)| { v.get_state_mut( *idx, ContainerCreationContext { @@ -215,19 +132,15 @@ impl ContainerStore { self.store.len() } - #[allow(unused)] - pub fn iter(&self) -> impl Iterator { - self.store.iter() - } - - pub fn iter_mut(&mut self) -> impl Iterator { - self.store.iter_mut() + pub fn iter_all_containers( + &mut self, + ) -> impl Iterator { + self.store.iter_all_containers_mut() } pub(super) fn get_or_create_mut(&mut self, idx: ContainerIdx) -> &mut State { self.store - .entry(idx) - .or_insert_with(|| { + .get_or_insert_with(idx, || { let state = super::create_state_( idx, &self.conf, @@ -240,8 +153,7 @@ impl ContainerStore { pub(super) fn get_or_create_imm(&mut self, idx: ContainerIdx) -> &State { self.store - .entry(idx) - .or_insert_with(|| { + .get_or_insert_with(idx, || { let state = super::create_state_( idx, &self.conf, @@ -253,16 +165,7 @@ impl ContainerStore { } pub(crate) fn estimate_size(&self) -> usize { - self.store.len() * 4 - + self - .store - .values() - .map(|v| v.estimate_size()) - .sum::() - } - - pub(super) fn contains(&self, idx: ContainerIdx) -> bool { - self.store.contains_key(&idx) + self.store.estimate_size() } pub(crate) fn fork( @@ -271,14 +174,9 @@ impl ContainerStore { peer: Arc, config: Configure, ) -> ContainerStore { - let mut store = FxHashMap::default(); - for (idx, container) in self.store.iter() { - store.insert(*idx, container.clone()); - } - ContainerStore { + store: self.store.fork(arena.clone()), arena, - store, conf: config, peer, } @@ -290,12 +188,12 @@ impl ContainerStore { panic!("store len mismatch"); } - for (idx, container) in self.store.iter_mut() { + for (idx, container) in self.store.iter_all_containers_mut() { let id = self.arena.get_container_id(*idx).unwrap(); let other_idx = other.arena.register_container(&id); let other_container = other .store - .get_mut(&other_idx) + .get_mut(other_idx) .expect("container not found on other store"); let other_id = other.arena.get_container_id(other_idx).unwrap(); assert_eq!( @@ -319,7 +217,7 @@ impl ContainerStore { other_container .decode_state(other_idx, ctx!(other)) .unwrap(); - other_container.bytes = None; + other_container.clear_bytes(); if container.encode() != other_container.encode() { panic!( "container mismatch Origin: {:#?}, New: {:#?}", @@ -330,242 +228,6 @@ impl ContainerStore { } } -#[derive(Clone, Debug)] -pub(crate) struct ContainerWrapper { - depth: usize, - kind: ContainerType, - parent: Option, - /// The possible combinations of is_some() are: - /// - /// 1. bytes: new container decoded from bytes - /// 2. bytes + value: new container decoded from bytes, with value decoded - /// 3. state + bytes + value: new container decoded from bytes, with value and state decoded - /// 4. state - bytes: Option, - value: Option, - bytes_offset_for_value: Option, - bytes_offset_for_state: Option, - state: Option, -} - -impl ContainerWrapper { - pub fn new(state: State, arena: &SharedArena) -> Self { - let idx = state.container_idx(); - let parent = arena - .get_parent(idx) - .and_then(|p| arena.get_container_id(p)); - let depth = arena.get_depth(idx).unwrap().get() as usize; - Self { - depth, - parent, - kind: idx.get_type(), - state: Some(state), - bytes: None, - value: None, - bytes_offset_for_state: None, - bytes_offset_for_value: None, - } - } - - pub fn depth(&self) -> usize { - self.depth - } - - /// It will not decode the state if it is not decoded - #[allow(unused)] - pub fn try_get_state(&self) -> Option<&State> { - self.state.as_ref() - } - - /// It will decode the state if it is not decoded - pub fn get_state(&mut self, idx: ContainerIdx, ctx: ContainerCreationContext) -> &State { - self.decode_state(idx, ctx).unwrap(); - self.state.as_ref().expect("ContainerWrapper is empty") - } - - /// It will decode the state if it is not decoded - pub fn get_state_mut( - &mut self, - idx: ContainerIdx, - ctx: ContainerCreationContext, - ) -> &mut State { - self.decode_state(idx, ctx).unwrap(); - self.bytes = None; - self.value = None; - self.state.as_mut().unwrap() - } - - pub fn get_value(&mut self, idx: ContainerIdx, ctx: ContainerCreationContext) -> LoroValue { - if let Some(v) = self.value.as_ref() { - return v.clone(); - } - - self.decode_value(idx, ctx).unwrap(); - if self.value.is_none() { - return self.state.as_mut().unwrap().get_value(); - } - - self.value.as_ref().unwrap().clone() - } - - pub fn encode(&mut self) -> Bytes { - if let Some(bytes) = self.bytes.as_ref() { - return bytes.clone(); - } - - // ContainerType - // Depth - // ParentID - // StateSnapshot - let mut output = Vec::new(); - output.push(self.kind.to_u8()); - leb128::write::unsigned(&mut output, self.depth as u64).unwrap(); - postcard::to_io(&self.parent, &mut output).unwrap(); - self.state - .as_mut() - .unwrap() - .encode_snapshot_fast(&mut output); - let ans: Bytes = output.into(); - self.bytes = Some(ans.clone()); - ans - } - - pub fn new_from_bytes(b: Bytes) -> Self { - let src: &[u8] = &b; - let bytes: &[u8] = &b; - let kind = ContainerType::try_from_u8(bytes[0]).unwrap(); - let mut bytes = &bytes[1..]; - let depth = leb128::read::unsigned(&mut bytes).unwrap(); - let (parent, bytes) = postcard::take_from_bytes(bytes).unwrap(); - // SAFETY: bytes is a slice of b - let size = unsafe { bytes.as_ptr().offset_from(src.as_ptr()) }; - Self { - depth: depth as usize, - kind, - parent, - state: None, - value: None, - bytes: Some(b.clone()), - bytes_offset_for_value: Some(size as usize), - bytes_offset_for_state: None, - } - } - - #[allow(unused)] - pub fn ensure_value(&mut self, idx: ContainerIdx, ctx: ContainerCreationContext) -> &LoroValue { - if self.value.is_some() { - } else if self.state.is_some() { - let value = self.state.as_mut().unwrap().get_value(); - self.value = Some(value); - } else { - self.decode_value(idx, ctx).unwrap(); - } - - self.value.as_ref().unwrap() - } - - fn decode_value(&mut self, idx: ContainerIdx, ctx: ContainerCreationContext) -> LoroResult<()> { - if self.value.is_some() || self.state.is_some() { - return Ok(()); - } - - let Some(b) = self.bytes.as_ref() else { - return Ok(()); - }; - - if self.bytes_offset_for_value.is_none() { - let src: &[u8] = b; - let mut bytes: &[u8] = b; - bytes = &bytes[1..]; - let _depth = leb128::read::unsigned(&mut bytes).unwrap(); - let (_parent, bytes) = postcard::take_from_bytes::>(bytes).unwrap(); - // SAFETY: bytes is a slice of b - let size = unsafe { bytes.as_ptr().offset_from(src.as_ptr()) }; - self.bytes_offset_for_value = Some(size as usize); - } - - let value_offset = self.bytes_offset_for_value.unwrap(); - let b = &b[value_offset..]; - - let (v, rest) = match self.kind { - ContainerType::Text => RichtextState::decode_value(b)?, - ContainerType::Map => MapState::decode_value(b)?, - ContainerType::List => ListState::decode_value(b)?, - ContainerType::MovableList => MovableListState::decode_value(b)?, - ContainerType::Tree => { - let mut state = TreeState::decode_snapshot_fast(idx, (LoroValue::Null, b), ctx)?; - self.value = Some(state.get_value()); - self.state = Some(State::TreeState(Box::new(state))); - self.bytes_offset_for_state = Some(value_offset); - return Ok(()); - } - #[cfg(feature = "counter")] - ContainerType::Counter => { - let (v, _rest) = CounterState::decode_value(b)?; - self.value = Some(v); - self.bytes_offset_for_state = Some(0); - return Ok(()); - } - ContainerType::Unknown(_) => UnknownState::decode_value(b)?, - }; - - self.value = Some(v); - // SAFETY: rest is a slice of b - let offset = unsafe { rest.as_ptr().offset_from(b.as_ptr()) }; - self.bytes_offset_for_state = Some(offset as usize + value_offset); - Ok(()) - } - - fn decode_state(&mut self, idx: ContainerIdx, ctx: ContainerCreationContext) -> LoroResult<()> { - if self.state.is_some() { - return Ok(()); - } - - if self.value.is_none() { - self.decode_value(idx, ctx)?; - } - - let b = self.bytes.as_ref().unwrap(); - let offset = self.bytes_offset_for_state.unwrap(); - let b = &b[offset..]; - let v = self.value.as_ref().unwrap().clone(); - let state: State = match self.kind { - ContainerType::Text => RichtextState::decode_snapshot_fast(idx, (v, b), ctx)?.into(), - ContainerType::Map => MapState::decode_snapshot_fast(idx, (v, b), ctx)?.into(), - ContainerType::List => ListState::decode_snapshot_fast(idx, (v, b), ctx)?.into(), - ContainerType::MovableList => { - MovableListState::decode_snapshot_fast(idx, (v, b), ctx)?.into() - } - ContainerType::Tree => TreeState::decode_snapshot_fast(idx, (v, b), ctx)?.into(), - #[cfg(feature = "counter")] - ContainerType::Counter => CounterState::decode_snapshot_fast(idx, (v, b), ctx)?.into(), - ContainerType::Unknown(_) => { - UnknownState::decode_snapshot_fast(idx, (v, b), ctx)?.into() - } - }; - self.state = Some(state); - Ok(()) - } - - pub fn estimate_size(&self) -> usize { - if let Some(bytes) = self.bytes.as_ref() { - return bytes.len(); - } - - self.state.as_ref().unwrap().estimate_size() - } - - #[allow(unused)] - pub(crate) fn is_state_empty(&self) -> bool { - if let Some(state) = self.state.as_ref() { - return state.is_state_empty(); - } - - // FIXME: it's not very accurate... - self.bytes.as_ref().unwrap().len() > 10 - } -} - mod encode { use loro_common::{ContainerID, ContainerType, Counter, InternalString, LoroError, LoroResult}; use serde::{Deserialize, Serialize}; diff --git a/crates/loro-internal/src/state/container_store/container_wrapper.rs b/crates/loro-internal/src/state/container_store/container_wrapper.rs new file mode 100644 index 00000000..b39f82a4 --- /dev/null +++ b/crates/loro-internal/src/state/container_store/container_wrapper.rs @@ -0,0 +1,275 @@ +use bytes::Bytes; +use loro_common::{ContainerID, ContainerType, LoroResult, LoroValue}; + +#[cfg(feature = "counter")] +use crate::state::counter_state::CounterState; +use crate::{ + arena::SharedArena, + container::idx::ContainerIdx, + state::{ + unknown_state::UnknownState, ContainerCreationContext, ContainerState, FastStateSnapshot, + ListState, MapState, MovableListState, RichtextState, State, TreeState, + }, +}; + +#[derive(Clone, Debug)] +pub(crate) struct ContainerWrapper { + depth: usize, + kind: ContainerType, + parent: Option, + /// The possible combinations of is_some() are: + /// + /// 1. bytes: new container decoded from bytes + /// 2. bytes + value: new container decoded from bytes, with value decoded + /// 3. state + bytes + value: new container decoded from bytes, with value and state decoded + /// 4. state + bytes: Option, + value: Option, + bytes_offset_for_value: Option, + bytes_offset_for_state: Option, + state: Option, + flushed: bool, +} + +impl ContainerWrapper { + pub fn new(state: State, arena: &SharedArena) -> Self { + let idx = state.container_idx(); + let parent = arena + .get_parent(idx) + .and_then(|p| arena.get_container_id(p)); + let depth = arena.get_depth(idx).unwrap().get() as usize; + Self { + depth, + parent, + kind: idx.get_type(), + state: Some(state), + bytes: None, + value: None, + bytes_offset_for_state: None, + bytes_offset_for_value: None, + flushed: false, + } + } + + pub fn depth(&self) -> usize { + self.depth + } + + /// It will not decode the state if it is not decoded + #[allow(unused)] + pub fn try_get_state(&self) -> Option<&State> { + self.state.as_ref() + } + + /// It will decode the state if it is not decoded + pub fn get_state(&mut self, idx: ContainerIdx, ctx: ContainerCreationContext) -> &State { + self.decode_state(idx, ctx).unwrap(); + self.state.as_ref().expect("ContainerWrapper is empty") + } + + /// It will decode the state if it is not decoded + pub fn get_state_mut( + &mut self, + idx: ContainerIdx, + ctx: ContainerCreationContext, + ) -> &mut State { + self.decode_state(idx, ctx).unwrap(); + self.bytes = None; + self.value = None; + self.flushed = false; + self.state.as_mut().unwrap() + } + + pub fn get_value(&mut self, idx: ContainerIdx, ctx: ContainerCreationContext) -> LoroValue { + if let Some(v) = self.value.as_ref() { + return v.clone(); + } + + self.decode_value(idx, ctx).unwrap(); + if self.value.is_none() { + return self.state.as_mut().unwrap().get_value(); + } + + self.value.as_ref().unwrap().clone() + } + + pub fn encode(&mut self) -> Bytes { + if let Some(bytes) = self.bytes.as_ref() { + return bytes.clone(); + } + + // ContainerType + // Depth + // ParentID + // StateSnapshot + let mut output = Vec::new(); + output.push(self.kind.to_u8()); + leb128::write::unsigned(&mut output, self.depth as u64).unwrap(); + postcard::to_io(&self.parent, &mut output).unwrap(); + self.state + .as_mut() + .unwrap() + .encode_snapshot_fast(&mut output); + let ans: Bytes = output.into(); + self.bytes = Some(ans.clone()); + ans + } + + pub fn decode_parent(b: &[u8]) -> Option { + let mut bytes = &b[1..]; + let _depth = leb128::read::unsigned(&mut bytes).unwrap(); + let (parent, _bytes) = postcard::take_from_bytes::>(bytes).unwrap(); + parent + } + + pub fn new_from_bytes(bytes: Bytes) -> Self { + let kind = ContainerType::try_from_u8(bytes[0]).unwrap(); + let mut reader = &bytes[1..]; + let depth = leb128::read::unsigned(&mut reader).unwrap(); + let (parent, reader) = postcard::take_from_bytes(reader).unwrap(); + let size = bytes.len() - reader.len(); + Self { + depth: depth as usize, + kind, + parent, + state: None, + value: None, + bytes: Some(bytes.clone()), + bytes_offset_for_value: Some(size), + bytes_offset_for_state: None, + flushed: true, + } + } + + #[allow(unused)] + pub fn ensure_value(&mut self, idx: ContainerIdx, ctx: ContainerCreationContext) -> &LoroValue { + if self.value.is_some() { + } else if self.state.is_some() { + let value = self.state.as_mut().unwrap().get_value(); + self.value = Some(value); + } else { + self.decode_value(idx, ctx).unwrap(); + } + + self.value.as_ref().unwrap() + } + + fn decode_value(&mut self, idx: ContainerIdx, ctx: ContainerCreationContext) -> LoroResult<()> { + if self.value.is_some() || self.state.is_some() { + return Ok(()); + } + + let Some(bytes) = self.bytes.as_ref() else { + return Ok(()); + }; + + if self.bytes_offset_for_value.is_none() { + let mut reader: &[u8] = bytes; + reader = &reader[1..]; + let _depth = leb128::read::unsigned(&mut reader).unwrap(); + let (_parent, reader) = + postcard::take_from_bytes::>(reader).unwrap(); + // SAFETY: bytes is a slice of b + let size = bytes.len() - reader.len(); + self.bytes_offset_for_value = Some(size); + } + + let value_offset = self.bytes_offset_for_value.unwrap(); + let b = &bytes[value_offset..]; + + let (v, rest) = match self.kind { + ContainerType::Text => RichtextState::decode_value(b)?, + ContainerType::Map => MapState::decode_value(b)?, + ContainerType::List => ListState::decode_value(b)?, + ContainerType::MovableList => MovableListState::decode_value(b)?, + ContainerType::Tree => { + let mut state = TreeState::decode_snapshot_fast(idx, (LoroValue::Null, b), ctx)?; + self.value = Some(state.get_value()); + self.state = Some(State::TreeState(Box::new(state))); + self.bytes_offset_for_state = Some(value_offset); + return Ok(()); + } + #[cfg(feature = "counter")] + ContainerType::Counter => { + let (v, _rest) = CounterState::decode_value(b)?; + self.value = Some(v); + self.bytes_offset_for_state = Some(0); + return Ok(()); + } + ContainerType::Unknown(_) => UnknownState::decode_value(b)?, + }; + + self.value = Some(v); + let offset = b.len() - rest.len(); + self.bytes_offset_for_state = Some(offset + value_offset); + Ok(()) + } + + pub(super) fn decode_state( + &mut self, + idx: ContainerIdx, + ctx: ContainerCreationContext, + ) -> LoroResult<()> { + if self.state.is_some() { + return Ok(()); + } + + if self.value.is_none() { + self.decode_value(idx, ctx)?; + } + + let b = self.bytes.as_ref().unwrap(); + let offset = self.bytes_offset_for_state.unwrap(); + let b = &b[offset..]; + let v = self.value.as_ref().unwrap().clone(); + let state: State = match self.kind { + ContainerType::Text => RichtextState::decode_snapshot_fast(idx, (v, b), ctx)?.into(), + ContainerType::Map => MapState::decode_snapshot_fast(idx, (v, b), ctx)?.into(), + ContainerType::List => ListState::decode_snapshot_fast(idx, (v, b), ctx)?.into(), + ContainerType::MovableList => { + MovableListState::decode_snapshot_fast(idx, (v, b), ctx)?.into() + } + ContainerType::Tree => TreeState::decode_snapshot_fast(idx, (v, b), ctx)?.into(), + #[cfg(feature = "counter")] + ContainerType::Counter => CounterState::decode_snapshot_fast(idx, (v, b), ctx)?.into(), + ContainerType::Unknown(_) => { + UnknownState::decode_snapshot_fast(idx, (v, b), ctx)?.into() + } + }; + self.state = Some(state); + Ok(()) + } + + pub fn estimate_size(&self) -> usize { + if let Some(bytes) = self.bytes.as_ref() { + return bytes.len(); + } + + self.state.as_ref().unwrap().estimate_size() + } + + #[allow(unused)] + pub(crate) fn is_state_empty(&self) -> bool { + if let Some(state) = self.state.as_ref() { + return state.is_state_empty(); + } + + // FIXME: it's not very accurate... + self.bytes.as_ref().unwrap().len() > 10 + } + + pub(crate) fn clear_bytes(&mut self) { + assert!(self.state.is_some()); + self.bytes = None; + self.bytes_offset_for_state = None; + self.bytes_offset_for_value = None; + } + + pub(crate) fn is_flushed(&self) -> bool { + self.flushed + } + + pub(crate) fn parent(&self) -> Option<&ContainerID> { + self.parent.as_ref() + } +} diff --git a/crates/loro-internal/src/state/container_store/inner_store.rs b/crates/loro-internal/src/state/container_store/inner_store.rs new file mode 100644 index 00000000..df39be27 --- /dev/null +++ b/crates/loro-internal/src/state/container_store/inner_store.rs @@ -0,0 +1,174 @@ +use std::ops::Bound; + +use bytes::Bytes; +use fxhash::FxHashMap; +use loro_common::ContainerID; + +use crate::{arena::SharedArena, container::idx::ContainerIdx, utils::kv_wrapper::KvWrapper}; + +use super::ContainerWrapper; + +/// The invariants about this struct: +/// +/// - `len` is the number of containers in the store. If a container is in both kv and store, +/// it should only take 1 space in `len`. +/// - `kv` is either the same or older than `store`. +/// - if `all_loaded` is true, then `store` contains all the entries from `kv` +pub(super) struct InnerStore { + arena: SharedArena, + store: FxHashMap, + kv: KvWrapper, + len: usize, + all_loaded: bool, +} + +impl std::fmt::Debug for InnerStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("InnerStore").finish() + } +} + +/// This impl block contains all the mutation code that may break the invariants of this struct +impl InnerStore { + pub(super) fn get_or_insert_with( + &mut self, + idx: ContainerIdx, + f: impl FnOnce() -> ContainerWrapper, + ) -> &mut ContainerWrapper { + if let std::collections::hash_map::Entry::Vacant(e) = self.store.entry(idx) { + let id = self.arena.get_container_id(idx).unwrap(); + let key = id.to_bytes(); + if !self.all_loaded && self.kv.contains_key(&key) { + let c = ContainerWrapper::new_from_bytes(self.kv.get(&key).unwrap()); + e.insert(c); + return self.store.get_mut(&idx).unwrap(); + } else { + let c = f(); + e.insert(c); + self.len += 1; + } + } + + self.store.get_mut(&idx).unwrap() + } + + pub(crate) fn get_mut(&mut self, idx: ContainerIdx) -> Option<&mut ContainerWrapper> { + if let std::collections::hash_map::Entry::Vacant(e) = self.store.entry(idx) { + let id = self.arena.get_container_id(idx).unwrap(); + let key = id.to_bytes(); + if !self.all_loaded && self.kv.contains_key(&key) { + let c = ContainerWrapper::new_from_bytes(self.kv.get(&key).unwrap()); + e.insert(c); + } + } + + self.store.get_mut(&idx) + } + + pub(crate) fn iter_all_containers_mut( + &mut self, + ) -> impl Iterator { + self.load_all(); + self.store.iter_mut() + } + + pub(crate) fn encode(&mut self) -> Bytes { + self.kv + .set_all(self.store.iter_mut().filter_map(|(idx, c)| { + if c.is_flushed() { + return None; + } + + let key = self.arena.get_container_id(*idx).unwrap(); + let key: Bytes = key.to_bytes().into(); + let value = c.encode(); + Some((key, value)) + })); + self.kv.export() + } + + pub(crate) fn decode(&mut self, bytes: bytes::Bytes) -> Result<(), loro_common::LoroError> { + assert!(self.len == 0); + self.kv.import(bytes); + self.kv.with_kv(|kv| { + let mut count = 0; + let iter = kv.scan(Bound::Unbounded, Bound::Unbounded); + for (k, v) in iter { + count += 1; + let cid = ContainerID::from_bytes(&k); + let parent = ContainerWrapper::decode_parent(&v); + let idx = self.arena.register_container(&cid); + let p = parent.as_ref().map(|p| self.arena.register_container(p)); + self.arena.set_parent(idx, p); + } + + self.len = count; + }); + + self.all_loaded = false; + Ok(()) + } + + fn load_all(&mut self) { + if self.all_loaded { + return; + } + + self.kv.with_kv(|kv| { + let iter = kv.scan(Bound::Unbounded, Bound::Unbounded); + for (k, v) in iter { + let cid = ContainerID::from_bytes(&k); + let idx = self.arena.register_container(&cid); + if self.store.contains_key(&idx) { + // the container is already loaded + // the content in `store` is guaranteed to be newer than the content in `kv` + continue; + } + + let container = ContainerWrapper::new_from_bytes(v); + self.store.insert(idx, container); + } + }); + + self.all_loaded = true; + } +} + +impl InnerStore { + pub(crate) fn new(arena: SharedArena) -> Self { + Self { + arena, + store: FxHashMap::default(), + kv: KvWrapper::new_mem(), + len: 0, + all_loaded: true, + } + } + + pub(crate) fn fork(&self, arena: SharedArena) -> InnerStore { + InnerStore { + arena, + store: self.store.clone(), + kv: self.kv.clone(), + len: self.len, + all_loaded: self.all_loaded, + } + } + + pub(crate) fn len(&self) -> usize { + self.len + } + + pub(crate) fn is_empty(&self) -> bool { + self.len == 0 + } + + pub(crate) fn estimate_size(&self) -> usize { + self.kv.with_kv(|kv| kv.size()) + + self + .store + .values() + .map(|c| if c.is_flushed() { 0 } else { c.estimate_size() }) + .sum::() + } +} diff --git a/crates/loro-internal/src/utils/kv_wrapper.rs b/crates/loro-internal/src/utils/kv_wrapper.rs new file mode 100644 index 00000000..4e120a65 --- /dev/null +++ b/crates/loro-internal/src/utils/kv_wrapper.rs @@ -0,0 +1,71 @@ +use std::{ + collections::BTreeMap, + ops::Bound, + sync::{Arc, Mutex}, +}; + +use bytes::Bytes; +use fxhash::FxHashMap; +use loro_common::ContainerID; + +use crate::kv_store::KvStore; + +pub(crate) enum Status { + BytesOnly, + ImmBoth, + MutState, +} + +/// This thin wrapper aims to limit the ability to modify the kv store and make +/// it easy to find all the modifications. +pub(crate) struct KvWrapper { + kv: Arc>, +} + +impl Clone for KvWrapper { + fn clone(&self) -> Self { + Self { + kv: self.kv.lock().unwrap().clone_store(), + } + } +} + +impl KvWrapper { + pub fn new_mem() -> Self { + Self { + kv: Arc::new(Mutex::new(BTreeMap::new())), + } + } + + pub fn import(&self, bytes: Bytes) { + let mut kv = self.kv.lock().unwrap(); + assert!(kv.len() == 0); + kv.import_all(bytes); + } + + pub fn export(&self) -> Bytes { + let kv = self.kv.lock().unwrap(); + kv.export_all() + } + + pub fn get(&self, key: &[u8]) -> Option { + let kv = self.kv.lock().unwrap(); + kv.get(key) + } + + pub fn with_kv(&self, f: impl FnOnce(&dyn KvStore) -> R) -> R { + let kv = self.kv.lock().unwrap(); + f(&*kv) + } + + pub fn set_all(&self, iter: impl Iterator) { + let mut kv = self.kv.lock().unwrap(); + for (k, v) in iter { + kv.set(&k, v); + } + } + + pub(crate) fn contains_key(&self, key: &[u8]) -> bool { + self.kv.lock().unwrap().contains_key(key) + } +} diff --git a/crates/loro-internal/src/utils/mod.rs b/crates/loro-internal/src/utils/mod.rs index 280cdbf2..f8101eff 100644 --- a/crates/loro-internal/src/utils/mod.rs +++ b/crates/loro-internal/src/utils/mod.rs @@ -1,3 +1,4 @@ +pub(crate) mod kv_wrapper; pub(crate) mod lazy; pub(crate) mod query_by_len; pub mod string_slice;