From e8572273301bab71d797635e9168c420f769d7b9 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Thu, 13 Jun 2024 22:49:43 +0800 Subject: [PATCH] feat: movable list snapshot --- .../src/state/movable_list_state.rs | 311 +++++++++++++++++- 1 file changed, 309 insertions(+), 2 deletions(-) diff --git a/crates/loro-internal/src/state/movable_list_state.rs b/crates/loro-internal/src/state/movable_list_state.rs index de0b6dc7..f6983034 100644 --- a/crates/loro-internal/src/state/movable_list_state.rs +++ b/crates/loro-internal/src/state/movable_list_state.rs @@ -40,7 +40,7 @@ pub struct ListItem { id: IdFull, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub(crate) struct Element { value: LoroValue, value_id: IdLp, @@ -308,6 +308,19 @@ mod inner { child_container_to_elem: FxHashMap, } + impl PartialEq for InnerState { + fn eq(&self, other: &Self) -> bool { + let v = self.id_to_list_leaf == other.id_to_list_leaf + && self.elements == other.elements + && self.child_container_to_elem == other.child_container_to_elem; + if !v { + false + } else { + self.list.iter().zip(other.list.iter()).all(|(a, b)| a == b) + } + } + } + fn eq(a: T, b: T) -> Result<(), ()> { if a == b { Ok(()) @@ -502,6 +515,8 @@ mod inner { } /// Drain the list items in the given range (op index). + /// + /// We also remove the elements that are pointed by the list items. pub fn list_drain( &mut self, range: std::ops::Range, @@ -1457,7 +1472,6 @@ struct EncodedId { #[columnar(strategy = "DeltaRle")] lamport: u32, } - #[columnar(ser, de)] struct EncodedSnapshot { #[columnar(class = "vec", iter = "EncodedItem")] @@ -1466,6 +1480,299 @@ struct EncodedSnapshot { ids: Vec, } +#[columnar(vec, ser, de, iterable)] +#[derive(Debug, Clone, Copy)] +struct EncodedItemForFastSnapshot { + #[columnar(strategy = "DeltaRle")] + invisible_list_item: usize, + #[columnar(strategy = "BoolRle")] + pos_id_eq_elem_id: bool, + // TODO: FIXME: I kinda hate this... We don't need to encode last_set_id if + // we remove the other snapshot mode. + #[columnar(strategy = "BoolRle")] + elem_id_eq_last_set_id: bool, +} + +#[columnar(vec, ser, de, iterable)] +#[derive(Debug, Clone)] +struct EncodedIdFull { + #[columnar(strategy = "DeltaRle")] + peer_idx: usize, + #[columnar(strategy = "DeltaRle")] + counter: i32, + #[columnar(strategy = "DeltaRle")] + lamport: u32, +} + +#[columnar(ser, de)] +struct EncodedFastSnapshot { + #[columnar(class = "vec", iter = "EncodedItemForFastSnapshot")] + items: Vec, + #[columnar(class = "vec", iter = "EncodedIdFull")] + list_item_ids: Vec, + #[columnar(class = "vec", iter = "EncodedId")] + elem_ids: Vec, + #[columnar(class = "vec", iter = "EncodedId")] + last_set_ids: Vec, +} + +mod snapshot { + use std::io::Read; + + use loro_common::{IdFull, IdLp, PeerID}; + + use crate::{ + encoding::value_register::ValueRegister, + state::{ContainerState, FastStateSnashot}, + }; + + use super::{ + inner::PushElemInfo, EncodedFastSnapshot, EncodedId, EncodedIdFull, EncodedItem, + EncodedItemForFastSnapshot, MovableListState, + }; + + impl FastStateSnashot for MovableListState { + fn encode_snapshot_fast(&mut self, mut w: W) { + let value = self.get_value(); + postcard::to_io(&value, &mut w).unwrap(); + let mut peers: ValueRegister = ValueRegister::new(); + let len = self.len(); + let mut items = Vec::with_capacity(len); + // starts with a sentinel value. The num of `invisible_list_item` may be updated later + items.push(EncodedItemForFastSnapshot { + pos_id_eq_elem_id: true, + invisible_list_item: 0, + elem_id_eq_last_set_id: true, + }); + + let mut list_item_ids = Vec::with_capacity(self.len()); + let mut elem_ids = Vec::new(); + let mut last_set_ids = Vec::new(); + for item in self.list().iter() { + if let Some(elem_id) = item.pointed_by { + let elem = self.elements().get(&elem_id).unwrap(); + let eq = elem_id.to_id() == item.id.idlp(); + let elem_id_eq_last_set_id = elem.value_id.compact() == elem_id; + items.push(EncodedItemForFastSnapshot { + invisible_list_item: 0, + pos_id_eq_elem_id: eq, + elem_id_eq_last_set_id, + }); + + list_item_ids.push(super::EncodedIdFull { + peer_idx: peers.register(&item.id.peer), + counter: item.id.counter, + lamport: item.id.lamport, + }); + if !eq { + elem_ids.push(super::EncodedId { + peer_idx: peers.register(&elem_id.peer), + lamport: elem_id.lamport.get(), + }) + } + if !elem_id_eq_last_set_id { + last_set_ids.push(super::EncodedId { + peer_idx: peers.register(&elem.value_id.peer), + lamport: elem.value_id.lamport, + }) + } + } else { + items.last_mut().unwrap().invisible_list_item += 1; + list_item_ids.push(super::EncodedIdFull { + peer_idx: peers.register(&item.id.peer), + counter: item.id.counter, + lamport: item.id.lamport, + }); + } + } + + let peers = peers.unwrap_vec(); + leb128::write::unsigned(&mut w, peers.len() as u64).unwrap(); + for peer in peers { + w.write_all(&peer.to_le_bytes()).unwrap(); + } + + let v = serde_columnar::to_vec(&EncodedFastSnapshot { + items, + list_item_ids, + elem_ids, + last_set_ids, + }) + .unwrap(); + w.write_all(&v).unwrap(); + } + + fn decode_value(bytes: &[u8]) -> loro_common::LoroResult<(loro_common::LoroValue, &[u8])> { + postcard::take_from_bytes(bytes).map_err(|_| { + loro_common::LoroError::DecodeError( + "Decode list value failed".to_string().into_boxed_str(), + ) + }) + } + + fn decode_snapshot_fast( + idx: crate::container::idx::ContainerIdx, + (list_value, mut bytes): (loro_common::LoroValue, &[u8]), + ) -> loro_common::LoroResult + where + Self: Sized, + { + let peer_num = leb128::read::unsigned(&mut bytes).unwrap() as usize; + let mut peers = Vec::with_capacity(peer_num); + for _ in 0..peer_num { + let mut buf = [0u8; 8]; + bytes.read_exact(&mut buf).unwrap(); + peers.push(PeerID::from_le_bytes(buf)); + } + + let mut ans = MovableListState::new(idx); + + let iters = serde_columnar::iter_from_bytes::(bytes).unwrap(); + let mut elem_iter = iters.elem_ids; + let item_iter = iters.items; + let mut list_item_id_iter = iters.list_item_ids; + let mut last_set_id_iter = iters.last_set_ids; + let mut is_first = true; + + let list_value = list_value.into_list().unwrap(); + let mut list_value_iter = list_value.iter(); + for item in item_iter { + let EncodedItemForFastSnapshot { + invisible_list_item, + pos_id_eq_elem_id, + elem_id_eq_last_set_id, + } = item.unwrap(); + + if !is_first { + let EncodedIdFull { + peer_idx, + counter, + lamport, + } = list_item_id_iter.next().unwrap().unwrap(); + let id_full = IdFull::new(peers[peer_idx], counter, lamport); + let elem_id = if pos_id_eq_elem_id { + id_full.idlp() + } else { + let EncodedId { peer_idx, lamport } = elem_iter.next().unwrap().unwrap(); + IdLp::new(peers[peer_idx], lamport) + }; + + let last_set_id = if elem_id_eq_last_set_id { + elem_id + } else { + let EncodedId { peer_idx, lamport } = + last_set_id_iter.next().unwrap().unwrap(); + IdLp::new(peers[peer_idx], lamport) + }; + + let value = list_value_iter.next().unwrap(); + ans.inner.push_inner( + id_full, + Some(PushElemInfo { + elem_id: elem_id.compact(), + value: value.clone(), + last_set_id, + }), + ) + } + + is_first = false; + for _ in 0..invisible_list_item { + let EncodedIdFull { + peer_idx, + counter, + lamport, + } = list_item_id_iter.next().unwrap().unwrap(); + let id_full = IdFull::new(peers[peer_idx], counter, lamport); + ans.inner.push_inner(id_full, None); + } + } + + debug_assert!(elem_iter.next().is_none()); + debug_assert!(list_item_id_iter.next().is_none()); + debug_assert!(last_set_id_iter.next().is_none()); + debug_assert!(list_value_iter.next().is_none()); + + Ok(ans) + } + } + + #[cfg(test)] + mod test { + use std::sync::Arc; + + use loro_common::{CompactIdLp, ContainerID, LoroValue, ID}; + + use crate::container::idx::ContainerIdx; + + use super::*; + + #[test] + fn test_movable_list_snapshot() { + let mut list = MovableListState::new(ContainerIdx::from_index_and_type( + 0, + loro_common::ContainerType::MovableList, + )); + + list.inner.insert_list_item(0, IdFull::new(9, 9, 9)); + list.inner.insert_list_item(1, IdFull::new(0, 0, 0)); + let _ = list.create_new_elem( + CompactIdLp::new(10, 10), + IdLp { + lamport: 0, + peer: 0, + }, + LoroValue::Container(ContainerID::new_normal( + ID::new(10, 10), + loro_common::ContainerType::Text, + )), + IdLp { + lamport: 1, + peer: 2, + }, + ); + list.inner.insert_list_item(2, IdFull::new(1, 1, 1)); + list.inner.insert_list_item(3, IdFull::new(2, 2, 2)); + list.inner.insert_list_item(4, IdFull::new(3, 3, 3)); + let _ = list.create_new_elem( + CompactIdLp::new(3, 8), + IdLp { + lamport: 3, + peer: 3, + }, + LoroValue::String(Arc::new("abc".to_string())), + IdLp { + lamport: 4, + peer: 5, + }, + ); + + let mut bytes = Vec::new(); + list.encode_snapshot_fast(&mut bytes); + + let (v, bytes) = MovableListState::decode_value(&bytes).unwrap(); + assert_eq!( + v, + vec![ + LoroValue::Container(ContainerID::new_normal( + ID::new(10, 10), + loro_common::ContainerType::Text, + )), + LoroValue::String(Arc::new("abc".to_string())), + ] + .into() + ); + let mut list2 = MovableListState::decode_snapshot_fast( + ContainerIdx::from_index_and_type(0, loro_common::ContainerType::MovableList), + (v.clone(), bytes), + ) + .unwrap(); + assert_eq!(&list2.get_value(), &v); + assert_eq!(&list.inner, &list2.inner); + } + } +} + #[cfg(test)] mod test { use crate::{HandlerTrait, LoroDoc, ToJson};