Refactor: use kv internally for docstate (#426)

* refactor: use kv in state

* refactor: do not load the state into the inner fxhashmap if not needed

* refactor: calc offset without unsafe code

* style: replace unsafe code
This commit is contained in:
Zixuan Chen 2024-08-24 14:16:06 +08:00 committed by GitHub
parent 95bec549d7
commit 1812caea65
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 636 additions and 383 deletions

View file

@ -2,6 +2,7 @@
"cSpell.words": [
"arbtest",
"bolds",
"bools",
"cids",
"clippy",
"collab",

View file

@ -1,4 +1,4 @@
use std::{fmt::Display, io::Write, sync::Arc};
use std::{fmt::Display, io::Write, str::Bytes, sync::Arc};
use arbitrary::Arbitrary;
use enum_as_inner::EnumAsInner;
@ -243,7 +243,8 @@ impl ContainerID {
name,
container_type,
} => {
writer.write_all(&[0, container_type.to_u8()])?;
let first_byte = container_type.to_u8() | 0b10000000;
writer.write_all(&[first_byte])?;
leb128::write::unsigned(writer, name.len() as u64)?;
writer.write_all(name.as_bytes())?;
}
@ -252,14 +253,50 @@ impl ContainerID {
counter,
container_type,
} => {
writer.write_all(&[1, container_type.to_u8()])?;
let first_byte = container_type.to_u8();
writer.write_all(&[first_byte])?;
writer.write_all(&peer.to_le_bytes())?;
leb128::write::unsigned(writer, *counter as u64)?;
writer.write_all(&counter.to_le_bytes())?;
}
}
Ok(())
}
pub fn to_bytes(&self) -> Vec<u8> {
let mut bytes = Vec::new();
self.encode(&mut bytes).unwrap();
bytes
}
pub fn from_bytes(bytes: &[u8]) -> Self {
let first_byte = bytes[0];
let container_type = ContainerType::try_from_u8(first_byte & 0b01111111).unwrap();
let is_root = (first_byte & 0b10000000) != 0;
let mut reader = &bytes[1..];
match is_root {
true => {
let name_len = leb128::read::unsigned(&mut reader).unwrap();
let name = InternalString::from(
std::str::from_utf8(&reader[..name_len as usize]).unwrap(),
);
Self::Root {
name,
container_type,
}
}
false => {
let peer = PeerID::from_le_bytes(reader[..8].try_into().unwrap());
let counter = i32::from_le_bytes(reader[8..12].try_into().unwrap());
Self::Normal {
peer,
counter,
container_type,
}
}
}
}
}
impl std::fmt::Debug for ContainerID {
@ -352,9 +389,7 @@ impl ContainerType {
4 => Ok(ContainerType::MovableList),
#[cfg(feature = "counter")]
5 => Ok(ContainerType::Counter),
_ => Err(LoroError::DecodeError(
format!("Unknown container type {v}").into_boxed_str(),
)),
x => Ok(ContainerType::Unknown(x)),
}
}
}
@ -617,7 +652,7 @@ pub mod wasm {
#[cfg(test)]
mod test {
use crate::ContainerID;
use crate::{ContainerID, ContainerType, ID};
#[test]
fn test_container_id_convert_to_and_from_str() {
@ -654,4 +689,38 @@ mod test {
assert!(ContainerID::try_from("id:0@0:Map").is_err());
assert!(ContainerID::try_from("cid:0@0:Unknown(6)").is_ok());
}
#[test]
fn test_container_id_encode_and_decode() {
let id = ContainerID::new_normal(ID::new(1, 2), ContainerType::Map);
let bytes = id.to_bytes();
assert_eq!(ContainerID::from_bytes(&bytes), id);
let id = ContainerID::new_normal(ID::new(u64::MAX, i32::MAX), ContainerType::Text);
let bytes = id.to_bytes();
assert_eq!(ContainerID::from_bytes(&bytes), id);
let id = ContainerID::new_root("test_root", ContainerType::List);
let bytes = id.to_bytes();
assert_eq!(ContainerID::from_bytes(&bytes), id);
let id = ContainerID::new_normal(ID::new(0, 0), ContainerType::MovableList);
let bytes = id.to_bytes();
assert_eq!(ContainerID::from_bytes(&bytes), id);
let id = ContainerID::new_root(&"x".repeat(1024), ContainerType::Tree);
let bytes = id.to_bytes();
assert_eq!(ContainerID::from_bytes(&bytes), id);
#[cfg(feature = "counter")]
{
let id = ContainerID::new_normal(ID::new(42, 100), ContainerType::Counter);
let bytes = id.to_bytes();
assert_eq!(ContainerID::from_bytes(&bytes), id);
}
let id = ContainerID::new_normal(ID::new(1, 1), ContainerType::Unknown(100));
let bytes = id.to_bytes();
assert_eq!(ContainerID::from_bytes(&bytes), id);
}
}

View file

@ -706,10 +706,10 @@ impl DocState {
self.store.iter_and_decode_all()
}
pub(crate) fn iter_wrapper_mut(
pub(crate) fn iter_all_containers_mut(
&mut self,
) -> impl Iterator<Item = (&ContainerIdx, &mut ContainerWrapper)> {
self.store.iter_mut()
self.store.iter_all_containers()
}
pub(crate) fn init_container(
@ -792,7 +792,7 @@ impl DocState {
if self.is_recording() {
let diff: Vec<_> = self
.store
.iter_mut()
.iter_all_containers()
.map(|(&idx, state)| InternalContainerDiff {
idx,
bring_back: false,

View file

@ -40,7 +40,7 @@ impl DocAnalysis {
let mut containers = FxHashMap::default();
let mut state = doc.app_state().lock().unwrap();
let alive_containers = state.get_all_alive_containers();
for (&idx, c) in state.iter_wrapper_mut() {
for (&idx, c) in state.iter_all_containers_mut() {
let ops_num = ops_nums.get(&idx).unwrap_or(&0);
let id = doc.arena().get_container_id(idx).unwrap();
let dropped = !alive_containers.contains(&id);

View file

@ -1,8 +1,6 @@
use std::{
cmp::Ordering,
sync::{atomic::AtomicU64, Arc},
};
#[cfg(feature = "counter")]
use super::counter_state::CounterState;
use super::{ContainerCreationContext, MovableListState, State, TreeState};
use crate::{
arena::SharedArena,
configure::Configure,
@ -10,17 +8,15 @@ use crate::{
state::{FastStateSnapshot, RichtextState},
};
use bytes::Bytes;
use encode::{decode_cids, CidOffsetEncoder};
use fxhash::FxHashMap;
use inner_store::InnerStore;
use loro_common::{ContainerID, ContainerType, LoroResult, LoroValue};
use std::sync::{atomic::AtomicU64, Arc};
use super::{
unknown_state::UnknownState, ContainerCreationContext, ContainerState, ListState, MapState,
MovableListState, State, TreeState,
};
pub(crate) use container_wrapper::ContainerWrapper;
#[cfg(feature = "counter")]
use super::counter_state::CounterState;
mod container_wrapper;
mod inner_store;
/// Encoding Schema for Container Store
///
@ -55,10 +51,9 @@ use super::counter_state::CounterState;
/// │ │
/// │ │
/// └───────────────────────────────────────────────────┘
#[derive(Clone)]
pub(crate) struct ContainerStore {
arena: SharedArena,
store: FxHashMap<ContainerIdx, ContainerWrapper>,
store: InnerStore,
conf: Configure,
peer: Arc<AtomicU64>,
}
@ -83,120 +78,42 @@ macro_rules! ctx {
impl ContainerStore {
pub fn new(arena: SharedArena, conf: Configure, peer: Arc<AtomicU64>) -> Self {
ContainerStore {
store: InnerStore::new(arena.clone()),
arena,
store: Default::default(),
conf,
peer,
}
}
pub fn get_container_mut(&mut self, idx: ContainerIdx) -> Option<&mut State> {
self.store.get_mut(&idx).map(|x| {
x.get_state_mut(
idx,
ContainerCreationContext {
configure: &self.conf,
peer: self.peer.load(std::sync::atomic::Ordering::Relaxed),
},
)
})
self.store
.get_mut(idx)
.map(|x| x.get_state_mut(idx, ctx!(self)))
}
#[allow(unused)]
pub fn get_container(&mut self, idx: ContainerIdx) -> Option<&State> {
self.store.get_mut(&idx).map(|x| {
x.get_state(
idx,
ContainerCreationContext {
configure: &self.conf,
peer: self.peer.load(std::sync::atomic::Ordering::Relaxed),
},
)
})
self.store
.get_mut(idx)
.map(|x| x.get_state(idx, ctx!(self)))
}
pub fn get_value(&mut self, idx: ContainerIdx) -> Option<LoroValue> {
self.store.get_mut(&idx).map(|c| {
c.get_value(
idx,
ContainerCreationContext {
configure: &self.conf,
peer: self.peer.load(std::sync::atomic::Ordering::Relaxed),
},
)
})
self.store
.get_mut(idx)
.map(|c| c.get_value(idx, ctx!(self)))
}
pub fn encode(&mut self) -> Bytes {
let mut id_bytes_pairs = Vec::with_capacity(self.store.len());
for (idx, container) in self.store.iter_mut() {
let id = self.arena.get_container_id(*idx).unwrap();
let encoded = container.encode();
#[cfg(debug_assertions)]
{
let mut new = ContainerWrapper::new_from_bytes(encoded.clone());
let value = new.get_value(*idx, ctx!(self));
assert_eq!(value, container.get_value(*idx, ctx!(self)));
}
id_bytes_pairs.push((id, encoded))
}
id_bytes_pairs.sort_by(|(a, _), (b, _)| a.cmp(b));
let mut id_encoder = CidOffsetEncoder::new();
let mut offset = 0;
for (id, bytes) in id_bytes_pairs.iter() {
id_encoder.push(id, offset);
offset += bytes.len();
}
let mut bytes = Vec::with_capacity(self.store.len() * 4 + 4);
// prepend 4 zeros
bytes.resize(4, 0);
// append the encoded cids
id_encoder.write_to_io(&mut bytes);
// set the first 4 bytes of bytes to the length of itself
let len = bytes.len() as u32;
bytes[0] = (len & 0xff) as u8;
bytes[1] = ((len >> 8) & 0xff) as u8;
bytes[2] = ((len >> 16) & 0xff) as u8;
bytes[3] = ((len >> 24) & 0xff) as u8;
for (_id, b) in id_bytes_pairs.iter() {
bytes.extend_from_slice(b);
}
bytes.into()
self.store.encode()
}
pub fn decode(&mut self, bytes: Bytes) -> LoroResult<()> {
assert!(self.is_empty());
let offset = u32::from_le_bytes((&bytes[..4]).try_into().unwrap()) as usize;
let cids = &bytes[4..offset];
let cids = decode_cids(cids)?;
let container_bytes = bytes.slice(offset..);
for (i, (cid, offset_start)) in cids.iter().enumerate() {
let offset_end = if i < cids.len() - 1 {
cids[i + 1].1
} else {
container_bytes.len()
};
let container =
ContainerWrapper::new_from_bytes(container_bytes.slice(*offset_start..offset_end));
let idx = self.arena.register_container(cid);
let p = container
.parent
.as_ref()
.map(|p| self.arena.register_container(p));
self.arena.set_parent(idx, p);
self.store.insert(idx, container);
}
Ok(())
self.store.decode(bytes)
}
pub fn iter_and_decode_all(&mut self) -> impl Iterator<Item = &mut State> {
self.store.iter_mut().map(|(idx, v)| {
self.store.iter_all_containers_mut().map(|(idx, v)| {
v.get_state_mut(
*idx,
ContainerCreationContext {
@ -215,19 +132,15 @@ impl ContainerStore {
self.store.len()
}
#[allow(unused)]
pub fn iter(&self) -> impl Iterator<Item = (&ContainerIdx, &ContainerWrapper)> {
self.store.iter()
}
pub fn iter_mut(&mut self) -> impl Iterator<Item = (&ContainerIdx, &mut ContainerWrapper)> {
self.store.iter_mut()
pub fn iter_all_containers(
&mut self,
) -> impl Iterator<Item = (&ContainerIdx, &mut ContainerWrapper)> {
self.store.iter_all_containers_mut()
}
pub(super) fn get_or_create_mut(&mut self, idx: ContainerIdx) -> &mut State {
self.store
.entry(idx)
.or_insert_with(|| {
.get_or_insert_with(idx, || {
let state = super::create_state_(
idx,
&self.conf,
@ -240,8 +153,7 @@ impl ContainerStore {
pub(super) fn get_or_create_imm(&mut self, idx: ContainerIdx) -> &State {
self.store
.entry(idx)
.or_insert_with(|| {
.get_or_insert_with(idx, || {
let state = super::create_state_(
idx,
&self.conf,
@ -253,16 +165,7 @@ impl ContainerStore {
}
pub(crate) fn estimate_size(&self) -> usize {
self.store.len() * 4
+ self
.store
.values()
.map(|v| v.estimate_size())
.sum::<usize>()
}
pub(super) fn contains(&self, idx: ContainerIdx) -> bool {
self.store.contains_key(&idx)
self.store.estimate_size()
}
pub(crate) fn fork(
@ -271,14 +174,9 @@ impl ContainerStore {
peer: Arc<AtomicU64>,
config: Configure,
) -> ContainerStore {
let mut store = FxHashMap::default();
for (idx, container) in self.store.iter() {
store.insert(*idx, container.clone());
}
ContainerStore {
store: self.store.fork(arena.clone()),
arena,
store,
conf: config,
peer,
}
@ -290,12 +188,12 @@ impl ContainerStore {
panic!("store len mismatch");
}
for (idx, container) in self.store.iter_mut() {
for (idx, container) in self.store.iter_all_containers_mut() {
let id = self.arena.get_container_id(*idx).unwrap();
let other_idx = other.arena.register_container(&id);
let other_container = other
.store
.get_mut(&other_idx)
.get_mut(other_idx)
.expect("container not found on other store");
let other_id = other.arena.get_container_id(other_idx).unwrap();
assert_eq!(
@ -319,7 +217,7 @@ impl ContainerStore {
other_container
.decode_state(other_idx, ctx!(other))
.unwrap();
other_container.bytes = None;
other_container.clear_bytes();
if container.encode() != other_container.encode() {
panic!(
"container mismatch Origin: {:#?}, New: {:#?}",
@ -330,242 +228,6 @@ impl ContainerStore {
}
}
#[derive(Clone, Debug)]
pub(crate) struct ContainerWrapper {
depth: usize,
kind: ContainerType,
parent: Option<ContainerID>,
/// The possible combinations of is_some() are:
///
/// 1. bytes: new container decoded from bytes
/// 2. bytes + value: new container decoded from bytes, with value decoded
/// 3. state + bytes + value: new container decoded from bytes, with value and state decoded
/// 4. state
bytes: Option<Bytes>,
value: Option<LoroValue>,
bytes_offset_for_value: Option<usize>,
bytes_offset_for_state: Option<usize>,
state: Option<State>,
}
impl ContainerWrapper {
pub fn new(state: State, arena: &SharedArena) -> Self {
let idx = state.container_idx();
let parent = arena
.get_parent(idx)
.and_then(|p| arena.get_container_id(p));
let depth = arena.get_depth(idx).unwrap().get() as usize;
Self {
depth,
parent,
kind: idx.get_type(),
state: Some(state),
bytes: None,
value: None,
bytes_offset_for_state: None,
bytes_offset_for_value: None,
}
}
pub fn depth(&self) -> usize {
self.depth
}
/// It will not decode the state if it is not decoded
#[allow(unused)]
pub fn try_get_state(&self) -> Option<&State> {
self.state.as_ref()
}
/// It will decode the state if it is not decoded
pub fn get_state(&mut self, idx: ContainerIdx, ctx: ContainerCreationContext) -> &State {
self.decode_state(idx, ctx).unwrap();
self.state.as_ref().expect("ContainerWrapper is empty")
}
/// It will decode the state if it is not decoded
pub fn get_state_mut(
&mut self,
idx: ContainerIdx,
ctx: ContainerCreationContext,
) -> &mut State {
self.decode_state(idx, ctx).unwrap();
self.bytes = None;
self.value = None;
self.state.as_mut().unwrap()
}
pub fn get_value(&mut self, idx: ContainerIdx, ctx: ContainerCreationContext) -> LoroValue {
if let Some(v) = self.value.as_ref() {
return v.clone();
}
self.decode_value(idx, ctx).unwrap();
if self.value.is_none() {
return self.state.as_mut().unwrap().get_value();
}
self.value.as_ref().unwrap().clone()
}
pub fn encode(&mut self) -> Bytes {
if let Some(bytes) = self.bytes.as_ref() {
return bytes.clone();
}
// ContainerType
// Depth
// ParentID
// StateSnapshot
let mut output = Vec::new();
output.push(self.kind.to_u8());
leb128::write::unsigned(&mut output, self.depth as u64).unwrap();
postcard::to_io(&self.parent, &mut output).unwrap();
self.state
.as_mut()
.unwrap()
.encode_snapshot_fast(&mut output);
let ans: Bytes = output.into();
self.bytes = Some(ans.clone());
ans
}
pub fn new_from_bytes(b: Bytes) -> Self {
let src: &[u8] = &b;
let bytes: &[u8] = &b;
let kind = ContainerType::try_from_u8(bytes[0]).unwrap();
let mut bytes = &bytes[1..];
let depth = leb128::read::unsigned(&mut bytes).unwrap();
let (parent, bytes) = postcard::take_from_bytes(bytes).unwrap();
// SAFETY: bytes is a slice of b
let size = unsafe { bytes.as_ptr().offset_from(src.as_ptr()) };
Self {
depth: depth as usize,
kind,
parent,
state: None,
value: None,
bytes: Some(b.clone()),
bytes_offset_for_value: Some(size as usize),
bytes_offset_for_state: None,
}
}
#[allow(unused)]
pub fn ensure_value(&mut self, idx: ContainerIdx, ctx: ContainerCreationContext) -> &LoroValue {
if self.value.is_some() {
} else if self.state.is_some() {
let value = self.state.as_mut().unwrap().get_value();
self.value = Some(value);
} else {
self.decode_value(idx, ctx).unwrap();
}
self.value.as_ref().unwrap()
}
fn decode_value(&mut self, idx: ContainerIdx, ctx: ContainerCreationContext) -> LoroResult<()> {
if self.value.is_some() || self.state.is_some() {
return Ok(());
}
let Some(b) = self.bytes.as_ref() else {
return Ok(());
};
if self.bytes_offset_for_value.is_none() {
let src: &[u8] = b;
let mut bytes: &[u8] = b;
bytes = &bytes[1..];
let _depth = leb128::read::unsigned(&mut bytes).unwrap();
let (_parent, bytes) = postcard::take_from_bytes::<Option<ContainerID>>(bytes).unwrap();
// SAFETY: bytes is a slice of b
let size = unsafe { bytes.as_ptr().offset_from(src.as_ptr()) };
self.bytes_offset_for_value = Some(size as usize);
}
let value_offset = self.bytes_offset_for_value.unwrap();
let b = &b[value_offset..];
let (v, rest) = match self.kind {
ContainerType::Text => RichtextState::decode_value(b)?,
ContainerType::Map => MapState::decode_value(b)?,
ContainerType::List => ListState::decode_value(b)?,
ContainerType::MovableList => MovableListState::decode_value(b)?,
ContainerType::Tree => {
let mut state = TreeState::decode_snapshot_fast(idx, (LoroValue::Null, b), ctx)?;
self.value = Some(state.get_value());
self.state = Some(State::TreeState(Box::new(state)));
self.bytes_offset_for_state = Some(value_offset);
return Ok(());
}
#[cfg(feature = "counter")]
ContainerType::Counter => {
let (v, _rest) = CounterState::decode_value(b)?;
self.value = Some(v);
self.bytes_offset_for_state = Some(0);
return Ok(());
}
ContainerType::Unknown(_) => UnknownState::decode_value(b)?,
};
self.value = Some(v);
// SAFETY: rest is a slice of b
let offset = unsafe { rest.as_ptr().offset_from(b.as_ptr()) };
self.bytes_offset_for_state = Some(offset as usize + value_offset);
Ok(())
}
fn decode_state(&mut self, idx: ContainerIdx, ctx: ContainerCreationContext) -> LoroResult<()> {
if self.state.is_some() {
return Ok(());
}
if self.value.is_none() {
self.decode_value(idx, ctx)?;
}
let b = self.bytes.as_ref().unwrap();
let offset = self.bytes_offset_for_state.unwrap();
let b = &b[offset..];
let v = self.value.as_ref().unwrap().clone();
let state: State = match self.kind {
ContainerType::Text => RichtextState::decode_snapshot_fast(idx, (v, b), ctx)?.into(),
ContainerType::Map => MapState::decode_snapshot_fast(idx, (v, b), ctx)?.into(),
ContainerType::List => ListState::decode_snapshot_fast(idx, (v, b), ctx)?.into(),
ContainerType::MovableList => {
MovableListState::decode_snapshot_fast(idx, (v, b), ctx)?.into()
}
ContainerType::Tree => TreeState::decode_snapshot_fast(idx, (v, b), ctx)?.into(),
#[cfg(feature = "counter")]
ContainerType::Counter => CounterState::decode_snapshot_fast(idx, (v, b), ctx)?.into(),
ContainerType::Unknown(_) => {
UnknownState::decode_snapshot_fast(idx, (v, b), ctx)?.into()
}
};
self.state = Some(state);
Ok(())
}
pub fn estimate_size(&self) -> usize {
if let Some(bytes) = self.bytes.as_ref() {
return bytes.len();
}
self.state.as_ref().unwrap().estimate_size()
}
#[allow(unused)]
pub(crate) fn is_state_empty(&self) -> bool {
if let Some(state) = self.state.as_ref() {
return state.is_state_empty();
}
// FIXME: it's not very accurate...
self.bytes.as_ref().unwrap().len() > 10
}
}
mod encode {
use loro_common::{ContainerID, ContainerType, Counter, InternalString, LoroError, LoroResult};
use serde::{Deserialize, Serialize};

View file

@ -0,0 +1,275 @@
use bytes::Bytes;
use loro_common::{ContainerID, ContainerType, LoroResult, LoroValue};
#[cfg(feature = "counter")]
use crate::state::counter_state::CounterState;
use crate::{
arena::SharedArena,
container::idx::ContainerIdx,
state::{
unknown_state::UnknownState, ContainerCreationContext, ContainerState, FastStateSnapshot,
ListState, MapState, MovableListState, RichtextState, State, TreeState,
},
};
#[derive(Clone, Debug)]
pub(crate) struct ContainerWrapper {
depth: usize,
kind: ContainerType,
parent: Option<ContainerID>,
/// The possible combinations of is_some() are:
///
/// 1. bytes: new container decoded from bytes
/// 2. bytes + value: new container decoded from bytes, with value decoded
/// 3. state + bytes + value: new container decoded from bytes, with value and state decoded
/// 4. state
bytes: Option<Bytes>,
value: Option<LoroValue>,
bytes_offset_for_value: Option<usize>,
bytes_offset_for_state: Option<usize>,
state: Option<State>,
flushed: bool,
}
impl ContainerWrapper {
pub fn new(state: State, arena: &SharedArena) -> Self {
let idx = state.container_idx();
let parent = arena
.get_parent(idx)
.and_then(|p| arena.get_container_id(p));
let depth = arena.get_depth(idx).unwrap().get() as usize;
Self {
depth,
parent,
kind: idx.get_type(),
state: Some(state),
bytes: None,
value: None,
bytes_offset_for_state: None,
bytes_offset_for_value: None,
flushed: false,
}
}
pub fn depth(&self) -> usize {
self.depth
}
/// It will not decode the state if it is not decoded
#[allow(unused)]
pub fn try_get_state(&self) -> Option<&State> {
self.state.as_ref()
}
/// It will decode the state if it is not decoded
pub fn get_state(&mut self, idx: ContainerIdx, ctx: ContainerCreationContext) -> &State {
self.decode_state(idx, ctx).unwrap();
self.state.as_ref().expect("ContainerWrapper is empty")
}
/// It will decode the state if it is not decoded
pub fn get_state_mut(
&mut self,
idx: ContainerIdx,
ctx: ContainerCreationContext,
) -> &mut State {
self.decode_state(idx, ctx).unwrap();
self.bytes = None;
self.value = None;
self.flushed = false;
self.state.as_mut().unwrap()
}
pub fn get_value(&mut self, idx: ContainerIdx, ctx: ContainerCreationContext) -> LoroValue {
if let Some(v) = self.value.as_ref() {
return v.clone();
}
self.decode_value(idx, ctx).unwrap();
if self.value.is_none() {
return self.state.as_mut().unwrap().get_value();
}
self.value.as_ref().unwrap().clone()
}
pub fn encode(&mut self) -> Bytes {
if let Some(bytes) = self.bytes.as_ref() {
return bytes.clone();
}
// ContainerType
// Depth
// ParentID
// StateSnapshot
let mut output = Vec::new();
output.push(self.kind.to_u8());
leb128::write::unsigned(&mut output, self.depth as u64).unwrap();
postcard::to_io(&self.parent, &mut output).unwrap();
self.state
.as_mut()
.unwrap()
.encode_snapshot_fast(&mut output);
let ans: Bytes = output.into();
self.bytes = Some(ans.clone());
ans
}
pub fn decode_parent(b: &[u8]) -> Option<ContainerID> {
let mut bytes = &b[1..];
let _depth = leb128::read::unsigned(&mut bytes).unwrap();
let (parent, _bytes) = postcard::take_from_bytes::<Option<ContainerID>>(bytes).unwrap();
parent
}
pub fn new_from_bytes(bytes: Bytes) -> Self {
let kind = ContainerType::try_from_u8(bytes[0]).unwrap();
let mut reader = &bytes[1..];
let depth = leb128::read::unsigned(&mut reader).unwrap();
let (parent, reader) = postcard::take_from_bytes(reader).unwrap();
let size = bytes.len() - reader.len();
Self {
depth: depth as usize,
kind,
parent,
state: None,
value: None,
bytes: Some(bytes.clone()),
bytes_offset_for_value: Some(size),
bytes_offset_for_state: None,
flushed: true,
}
}
#[allow(unused)]
pub fn ensure_value(&mut self, idx: ContainerIdx, ctx: ContainerCreationContext) -> &LoroValue {
if self.value.is_some() {
} else if self.state.is_some() {
let value = self.state.as_mut().unwrap().get_value();
self.value = Some(value);
} else {
self.decode_value(idx, ctx).unwrap();
}
self.value.as_ref().unwrap()
}
fn decode_value(&mut self, idx: ContainerIdx, ctx: ContainerCreationContext) -> LoroResult<()> {
if self.value.is_some() || self.state.is_some() {
return Ok(());
}
let Some(bytes) = self.bytes.as_ref() else {
return Ok(());
};
if self.bytes_offset_for_value.is_none() {
let mut reader: &[u8] = bytes;
reader = &reader[1..];
let _depth = leb128::read::unsigned(&mut reader).unwrap();
let (_parent, reader) =
postcard::take_from_bytes::<Option<ContainerID>>(reader).unwrap();
// SAFETY: bytes is a slice of b
let size = bytes.len() - reader.len();
self.bytes_offset_for_value = Some(size);
}
let value_offset = self.bytes_offset_for_value.unwrap();
let b = &bytes[value_offset..];
let (v, rest) = match self.kind {
ContainerType::Text => RichtextState::decode_value(b)?,
ContainerType::Map => MapState::decode_value(b)?,
ContainerType::List => ListState::decode_value(b)?,
ContainerType::MovableList => MovableListState::decode_value(b)?,
ContainerType::Tree => {
let mut state = TreeState::decode_snapshot_fast(idx, (LoroValue::Null, b), ctx)?;
self.value = Some(state.get_value());
self.state = Some(State::TreeState(Box::new(state)));
self.bytes_offset_for_state = Some(value_offset);
return Ok(());
}
#[cfg(feature = "counter")]
ContainerType::Counter => {
let (v, _rest) = CounterState::decode_value(b)?;
self.value = Some(v);
self.bytes_offset_for_state = Some(0);
return Ok(());
}
ContainerType::Unknown(_) => UnknownState::decode_value(b)?,
};
self.value = Some(v);
let offset = b.len() - rest.len();
self.bytes_offset_for_state = Some(offset + value_offset);
Ok(())
}
pub(super) fn decode_state(
&mut self,
idx: ContainerIdx,
ctx: ContainerCreationContext,
) -> LoroResult<()> {
if self.state.is_some() {
return Ok(());
}
if self.value.is_none() {
self.decode_value(idx, ctx)?;
}
let b = self.bytes.as_ref().unwrap();
let offset = self.bytes_offset_for_state.unwrap();
let b = &b[offset..];
let v = self.value.as_ref().unwrap().clone();
let state: State = match self.kind {
ContainerType::Text => RichtextState::decode_snapshot_fast(idx, (v, b), ctx)?.into(),
ContainerType::Map => MapState::decode_snapshot_fast(idx, (v, b), ctx)?.into(),
ContainerType::List => ListState::decode_snapshot_fast(idx, (v, b), ctx)?.into(),
ContainerType::MovableList => {
MovableListState::decode_snapshot_fast(idx, (v, b), ctx)?.into()
}
ContainerType::Tree => TreeState::decode_snapshot_fast(idx, (v, b), ctx)?.into(),
#[cfg(feature = "counter")]
ContainerType::Counter => CounterState::decode_snapshot_fast(idx, (v, b), ctx)?.into(),
ContainerType::Unknown(_) => {
UnknownState::decode_snapshot_fast(idx, (v, b), ctx)?.into()
}
};
self.state = Some(state);
Ok(())
}
pub fn estimate_size(&self) -> usize {
if let Some(bytes) = self.bytes.as_ref() {
return bytes.len();
}
self.state.as_ref().unwrap().estimate_size()
}
#[allow(unused)]
pub(crate) fn is_state_empty(&self) -> bool {
if let Some(state) = self.state.as_ref() {
return state.is_state_empty();
}
// FIXME: it's not very accurate...
self.bytes.as_ref().unwrap().len() > 10
}
pub(crate) fn clear_bytes(&mut self) {
assert!(self.state.is_some());
self.bytes = None;
self.bytes_offset_for_state = None;
self.bytes_offset_for_value = None;
}
pub(crate) fn is_flushed(&self) -> bool {
self.flushed
}
pub(crate) fn parent(&self) -> Option<&ContainerID> {
self.parent.as_ref()
}
}

View file

@ -0,0 +1,174 @@
use std::ops::Bound;
use bytes::Bytes;
use fxhash::FxHashMap;
use loro_common::ContainerID;
use crate::{arena::SharedArena, container::idx::ContainerIdx, utils::kv_wrapper::KvWrapper};
use super::ContainerWrapper;
/// The invariants about this struct:
///
/// - `len` is the number of containers in the store. If a container is in both kv and store,
/// it should only take 1 space in `len`.
/// - `kv` is either the same or older than `store`.
/// - if `all_loaded` is true, then `store` contains all the entries from `kv`
pub(super) struct InnerStore {
arena: SharedArena,
store: FxHashMap<ContainerIdx, ContainerWrapper>,
kv: KvWrapper,
len: usize,
all_loaded: bool,
}
impl std::fmt::Debug for InnerStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InnerStore").finish()
}
}
/// This impl block contains all the mutation code that may break the invariants of this struct
impl InnerStore {
pub(super) fn get_or_insert_with(
&mut self,
idx: ContainerIdx,
f: impl FnOnce() -> ContainerWrapper,
) -> &mut ContainerWrapper {
if let std::collections::hash_map::Entry::Vacant(e) = self.store.entry(idx) {
let id = self.arena.get_container_id(idx).unwrap();
let key = id.to_bytes();
if !self.all_loaded && self.kv.contains_key(&key) {
let c = ContainerWrapper::new_from_bytes(self.kv.get(&key).unwrap());
e.insert(c);
return self.store.get_mut(&idx).unwrap();
} else {
let c = f();
e.insert(c);
self.len += 1;
}
}
self.store.get_mut(&idx).unwrap()
}
pub(crate) fn get_mut(&mut self, idx: ContainerIdx) -> Option<&mut ContainerWrapper> {
if let std::collections::hash_map::Entry::Vacant(e) = self.store.entry(idx) {
let id = self.arena.get_container_id(idx).unwrap();
let key = id.to_bytes();
if !self.all_loaded && self.kv.contains_key(&key) {
let c = ContainerWrapper::new_from_bytes(self.kv.get(&key).unwrap());
e.insert(c);
}
}
self.store.get_mut(&idx)
}
pub(crate) fn iter_all_containers_mut(
&mut self,
) -> impl Iterator<Item = (&ContainerIdx, &mut ContainerWrapper)> {
self.load_all();
self.store.iter_mut()
}
pub(crate) fn encode(&mut self) -> Bytes {
self.kv
.set_all(self.store.iter_mut().filter_map(|(idx, c)| {
if c.is_flushed() {
return None;
}
let key = self.arena.get_container_id(*idx).unwrap();
let key: Bytes = key.to_bytes().into();
let value = c.encode();
Some((key, value))
}));
self.kv.export()
}
pub(crate) fn decode(&mut self, bytes: bytes::Bytes) -> Result<(), loro_common::LoroError> {
assert!(self.len == 0);
self.kv.import(bytes);
self.kv.with_kv(|kv| {
let mut count = 0;
let iter = kv.scan(Bound::Unbounded, Bound::Unbounded);
for (k, v) in iter {
count += 1;
let cid = ContainerID::from_bytes(&k);
let parent = ContainerWrapper::decode_parent(&v);
let idx = self.arena.register_container(&cid);
let p = parent.as_ref().map(|p| self.arena.register_container(p));
self.arena.set_parent(idx, p);
}
self.len = count;
});
self.all_loaded = false;
Ok(())
}
fn load_all(&mut self) {
if self.all_loaded {
return;
}
self.kv.with_kv(|kv| {
let iter = kv.scan(Bound::Unbounded, Bound::Unbounded);
for (k, v) in iter {
let cid = ContainerID::from_bytes(&k);
let idx = self.arena.register_container(&cid);
if self.store.contains_key(&idx) {
// the container is already loaded
// the content in `store` is guaranteed to be newer than the content in `kv`
continue;
}
let container = ContainerWrapper::new_from_bytes(v);
self.store.insert(idx, container);
}
});
self.all_loaded = true;
}
}
impl InnerStore {
pub(crate) fn new(arena: SharedArena) -> Self {
Self {
arena,
store: FxHashMap::default(),
kv: KvWrapper::new_mem(),
len: 0,
all_loaded: true,
}
}
pub(crate) fn fork(&self, arena: SharedArena) -> InnerStore {
InnerStore {
arena,
store: self.store.clone(),
kv: self.kv.clone(),
len: self.len,
all_loaded: self.all_loaded,
}
}
pub(crate) fn len(&self) -> usize {
self.len
}
pub(crate) fn is_empty(&self) -> bool {
self.len == 0
}
pub(crate) fn estimate_size(&self) -> usize {
self.kv.with_kv(|kv| kv.size())
+ self
.store
.values()
.map(|c| if c.is_flushed() { 0 } else { c.estimate_size() })
.sum::<usize>()
}
}

View file

@ -0,0 +1,71 @@
use std::{
collections::BTreeMap,
ops::Bound,
sync::{Arc, Mutex},
};
use bytes::Bytes;
use fxhash::FxHashMap;
use loro_common::ContainerID;
use crate::kv_store::KvStore;
pub(crate) enum Status {
BytesOnly,
ImmBoth,
MutState,
}
/// This thin wrapper aims to limit the ability to modify the kv store and make
/// it easy to find all the modifications.
pub(crate) struct KvWrapper {
kv: Arc<Mutex<dyn KvStore>>,
}
impl Clone for KvWrapper {
fn clone(&self) -> Self {
Self {
kv: self.kv.lock().unwrap().clone_store(),
}
}
}
impl KvWrapper {
pub fn new_mem() -> Self {
Self {
kv: Arc::new(Mutex::new(BTreeMap::new())),
}
}
pub fn import(&self, bytes: Bytes) {
let mut kv = self.kv.lock().unwrap();
assert!(kv.len() == 0);
kv.import_all(bytes);
}
pub fn export(&self) -> Bytes {
let kv = self.kv.lock().unwrap();
kv.export_all()
}
pub fn get(&self, key: &[u8]) -> Option<Bytes> {
let kv = self.kv.lock().unwrap();
kv.get(key)
}
pub fn with_kv<R>(&self, f: impl FnOnce(&dyn KvStore) -> R) -> R {
let kv = self.kv.lock().unwrap();
f(&*kv)
}
pub fn set_all(&self, iter: impl Iterator<Item = (Bytes, Bytes)>) {
let mut kv = self.kv.lock().unwrap();
for (k, v) in iter {
kv.set(&k, v);
}
}
pub(crate) fn contains_key(&self, key: &[u8]) -> bool {
self.kv.lock().unwrap().contains_key(key)
}
}

View file

@ -1,3 +1,4 @@
pub(crate) mod kv_wrapper;
pub(crate) mod lazy;
pub(crate) mod query_by_len;
pub mod string_slice;