feat: list fast snapshot

This commit is contained in:
Zixuan Chen 2024-06-13 16:16:52 +08:00
parent 4dd609ca51
commit 01427947f7
No known key found for this signature in database
2 changed files with 150 additions and 8 deletions

View file

@ -1,5 +1,6 @@
use std::{
borrow::Cow,
io::Write,
sync::{atomic::AtomicU8, Arc, Mutex, RwLock, Weak},
};
@ -84,6 +85,14 @@ impl std::fmt::Debug for DocState {
}
}
pub(crate) trait FastStateSnashot {
fn encode_snapshot_fast<W: Write>(&mut self, w: W);
fn decode_value(bytes: &[u8]) -> LoroResult<(LoroValue, &[u8])>;
fn decode_snapshot_fast(idx: ContainerIdx, v: (LoroValue, &[u8])) -> LoroResult<Self>
where
Self: Sized;
}
#[enum_dispatch]
pub(crate) trait ContainerState: Clone {
fn container_idx(&self) -> ContainerIdx;

View file

@ -1,9 +1,10 @@
use std::{
io::Write,
ops::RangeBounds,
sync::{Arc, Mutex, Weak},
};
use super::ContainerState;
use super::{ContainerState, FastStateSnashot};
use crate::{
arena::SharedArena,
container::{idx::ContainerIdx, list::list_op::ListOp, ContainerID},
@ -211,12 +212,31 @@ impl ListState {
.insert(value.into_container().unwrap(), leaf.leaf);
}
for leaf in data.arr {
let v = &self.list.get_elem(leaf).unwrap().v;
if v.is_container() {
assert!(data.arr.is_empty());
}
pub fn push(&mut self, value: LoroValue, id: IdFull) {
if self.list.is_empty() {
let idx = self.list.push(Elem {
v: value.clone(),
id,
});
if value.is_container() {
self.child_container_to_leaf
.insert(v.as_container().unwrap().clone(), leaf);
.insert(value.into_container().unwrap(), idx.leaf);
}
return;
}
let leaf = self.list.push(Elem {
v: value.clone(),
id,
});
if value.is_container() {
self.child_container_to_leaf
.insert(value.into_container().unwrap(), leaf.leaf);
}
}
@ -521,19 +541,102 @@ impl ContainerState for ListState {
}
}
mod snapshot {
use std::{borrow::Cow, io::Read};
use loro_common::{Counter, Lamport, PeerID};
use serde::Serialize;
use crate::encoding::value_register::ValueRegister;
use super::*;
impl FastStateSnashot for ListState {
fn encode_snapshot_fast<W: Write>(&mut self, mut w: W) {
let value = self.get_value();
postcard::to_io(&value, &mut w).unwrap();
let mut peers: ValueRegister<PeerID> = ValueRegister::new();
let mut id_bytes = Vec::new();
for elem in self.iter_with_id() {
let id = elem.id;
let peer_idx = peers.register(&id.peer);
leb128::write::unsigned(&mut id_bytes, peer_idx as u64).unwrap();
leb128::write::unsigned(&mut id_bytes, id.counter as u64).unwrap();
leb128::write::unsigned(&mut id_bytes, id.lamport as u64).unwrap();
}
let peers = peers.unwrap_vec();
leb128::write::unsigned(&mut w, peers.len() as u64).unwrap();
for peer in peers {
w.write_all(&peer.to_le_bytes()).unwrap();
}
w.write_all(&id_bytes).unwrap();
}
fn decode_snapshot_fast(
idx: ContainerIdx,
(v, mut bytes): (LoroValue, &[u8]),
) -> LoroResult<Self>
where
Self: Sized,
{
let peer_num = leb128::read::unsigned(&mut bytes).unwrap() as usize;
let mut peers = Vec::with_capacity(peer_num);
for _ in 0..peer_num {
let mut buf = [0u8; 8];
bytes.read_exact(&mut buf).unwrap();
peers.push(PeerID::from_le_bytes(buf));
}
let list = v.as_list().unwrap();
let mut ans = Self::new(idx);
let mut i = 0;
while !bytes.is_empty() {
let peer_idx = leb128::read::unsigned(&mut bytes).unwrap();
let counter = leb128::read::unsigned(&mut bytes).unwrap();
let lamport = leb128::read::unsigned(&mut bytes).unwrap();
ans.insert(
i,
list[i].clone(),
IdFull::new(
peers[peer_idx as usize],
counter as Counter,
lamport as Lamport,
),
);
i += 1;
}
Ok(ans)
}
fn decode_value(bytes: &[u8]) -> LoroResult<(LoroValue, &[u8])> {
postcard::take_from_bytes(bytes).map_err(|_| {
loro_common::LoroError::DecodeError(
"Decode list value failed".to_string().into_boxed_str(),
)
})
}
}
}
#[cfg(test)]
mod test {
use loro_common::{Counter, Lamport};
use super::*;
fn id(name: &str) -> ContainerID {
ContainerID::new_root(name, crate::ContainerType::List)
}
#[test]
fn test() {
let mut list = ListState::new(ContainerIdx::from_index_and_type(
0,
loro_common::ContainerType::List,
));
fn id(name: &str) -> ContainerID {
ContainerID::new_root(name, crate::ContainerType::List)
}
list.insert(0, LoroValue::Container(id("abc")), IdFull::new(0, 0, 0));
list.insert(0, LoroValue::Container(id("x")), IdFull::new(0, 0, 0));
assert_eq!(list.get_child_container_index(&id("x")), Some(0));
@ -542,4 +645,34 @@ mod test {
assert_eq!(list.get_child_container_index(&id("x")), Some(0));
assert_eq!(list.get_child_container_index(&id("abc")), Some(2));
}
#[test]
fn test_list_fast_snapshot() {
let mut list = ListState::new(ContainerIdx::from_index_and_type(
0,
loro_common::ContainerType::List,
));
list.insert(0, LoroValue::I64(0), IdFull::new(0, 0, 0));
list.insert(1, LoroValue::I64(2), IdFull::new(1, 1, 1));
let mut w = Vec::new();
list.encode_snapshot_fast(&mut w);
let (v, left) = ListState::decode_value(&w).unwrap();
let mut new_list = ListState::decode_snapshot_fast(
ContainerIdx::from_index_and_type(0, loro_common::ContainerType::List),
(v.clone(), left),
)
.unwrap();
new_list.check();
assert_eq!(
new_list.get_value(),
vec![LoroValue::I64(0), LoroValue::I64(2)].into()
);
assert_eq!(new_list.get_value(), v);
for (i, elem) in new_list.list.iter().enumerate() {
assert_eq!(elem.id.peer, i as u64);
assert_eq!(elem.id.counter, i as Counter);
assert_eq!(elem.id.lamport, i as Lamport);
}
}
}