feat: movable list snapshot

This commit is contained in:
Zixuan Chen 2024-06-13 22:49:43 +08:00
parent 0d3709b79e
commit e857227330
No known key found for this signature in database

View file

@ -40,7 +40,7 @@ pub struct ListItem {
id: IdFull,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct Element {
value: LoroValue,
value_id: IdLp,
@ -308,6 +308,19 @@ mod inner {
child_container_to_elem: FxHashMap<ContainerID, CompactIdLp>,
}
impl PartialEq for InnerState {
fn eq(&self, other: &Self) -> bool {
let v = self.id_to_list_leaf == other.id_to_list_leaf
&& self.elements == other.elements
&& self.child_container_to_elem == other.child_container_to_elem;
if !v {
false
} else {
self.list.iter().zip(other.list.iter()).all(|(a, b)| a == b)
}
}
}
fn eq<T: PartialEq>(a: T, b: T) -> Result<(), ()> {
if a == b {
Ok(())
@ -502,6 +515,8 @@ mod inner {
}
/// Drain the list items in the given range (op index).
///
/// We also remove the elements that are pointed by the list items.
pub fn list_drain(
&mut self,
range: std::ops::Range<usize>,
@ -1457,7 +1472,6 @@ struct EncodedId {
#[columnar(strategy = "DeltaRle")]
lamport: u32,
}
#[columnar(ser, de)]
struct EncodedSnapshot {
#[columnar(class = "vec", iter = "EncodedItem")]
@ -1466,6 +1480,299 @@ struct EncodedSnapshot {
ids: Vec<EncodedId>,
}
#[columnar(vec, ser, de, iterable)]
#[derive(Debug, Clone, Copy)]
struct EncodedItemForFastSnapshot {
#[columnar(strategy = "DeltaRle")]
invisible_list_item: usize,
#[columnar(strategy = "BoolRle")]
pos_id_eq_elem_id: bool,
// TODO: FIXME: I kinda hate this... We don't need to encode last_set_id if
// we remove the other snapshot mode.
#[columnar(strategy = "BoolRle")]
elem_id_eq_last_set_id: bool,
}
#[columnar(vec, ser, de, iterable)]
#[derive(Debug, Clone)]
struct EncodedIdFull {
#[columnar(strategy = "DeltaRle")]
peer_idx: usize,
#[columnar(strategy = "DeltaRle")]
counter: i32,
#[columnar(strategy = "DeltaRle")]
lamport: u32,
}
#[columnar(ser, de)]
struct EncodedFastSnapshot {
#[columnar(class = "vec", iter = "EncodedItemForFastSnapshot")]
items: Vec<EncodedItemForFastSnapshot>,
#[columnar(class = "vec", iter = "EncodedIdFull")]
list_item_ids: Vec<EncodedIdFull>,
#[columnar(class = "vec", iter = "EncodedId")]
elem_ids: Vec<EncodedId>,
#[columnar(class = "vec", iter = "EncodedId")]
last_set_ids: Vec<EncodedId>,
}
mod snapshot {
use std::io::Read;
use loro_common::{IdFull, IdLp, PeerID};
use crate::{
encoding::value_register::ValueRegister,
state::{ContainerState, FastStateSnashot},
};
use super::{
inner::PushElemInfo, EncodedFastSnapshot, EncodedId, EncodedIdFull, EncodedItem,
EncodedItemForFastSnapshot, MovableListState,
};
impl FastStateSnashot for MovableListState {
fn encode_snapshot_fast<W: std::io::prelude::Write>(&mut self, mut w: W) {
let value = self.get_value();
postcard::to_io(&value, &mut w).unwrap();
let mut peers: ValueRegister<PeerID> = ValueRegister::new();
let len = self.len();
let mut items = Vec::with_capacity(len);
// starts with a sentinel value. The num of `invisible_list_item` may be updated later
items.push(EncodedItemForFastSnapshot {
pos_id_eq_elem_id: true,
invisible_list_item: 0,
elem_id_eq_last_set_id: true,
});
let mut list_item_ids = Vec::with_capacity(self.len());
let mut elem_ids = Vec::new();
let mut last_set_ids = Vec::new();
for item in self.list().iter() {
if let Some(elem_id) = item.pointed_by {
let elem = self.elements().get(&elem_id).unwrap();
let eq = elem_id.to_id() == item.id.idlp();
let elem_id_eq_last_set_id = elem.value_id.compact() == elem_id;
items.push(EncodedItemForFastSnapshot {
invisible_list_item: 0,
pos_id_eq_elem_id: eq,
elem_id_eq_last_set_id,
});
list_item_ids.push(super::EncodedIdFull {
peer_idx: peers.register(&item.id.peer),
counter: item.id.counter,
lamport: item.id.lamport,
});
if !eq {
elem_ids.push(super::EncodedId {
peer_idx: peers.register(&elem_id.peer),
lamport: elem_id.lamport.get(),
})
}
if !elem_id_eq_last_set_id {
last_set_ids.push(super::EncodedId {
peer_idx: peers.register(&elem.value_id.peer),
lamport: elem.value_id.lamport,
})
}
} else {
items.last_mut().unwrap().invisible_list_item += 1;
list_item_ids.push(super::EncodedIdFull {
peer_idx: peers.register(&item.id.peer),
counter: item.id.counter,
lamport: item.id.lamport,
});
}
}
let peers = peers.unwrap_vec();
leb128::write::unsigned(&mut w, peers.len() as u64).unwrap();
for peer in peers {
w.write_all(&peer.to_le_bytes()).unwrap();
}
let v = serde_columnar::to_vec(&EncodedFastSnapshot {
items,
list_item_ids,
elem_ids,
last_set_ids,
})
.unwrap();
w.write_all(&v).unwrap();
}
fn decode_value(bytes: &[u8]) -> loro_common::LoroResult<(loro_common::LoroValue, &[u8])> {
postcard::take_from_bytes(bytes).map_err(|_| {
loro_common::LoroError::DecodeError(
"Decode list value failed".to_string().into_boxed_str(),
)
})
}
fn decode_snapshot_fast(
idx: crate::container::idx::ContainerIdx,
(list_value, mut bytes): (loro_common::LoroValue, &[u8]),
) -> loro_common::LoroResult<Self>
where
Self: Sized,
{
let peer_num = leb128::read::unsigned(&mut bytes).unwrap() as usize;
let mut peers = Vec::with_capacity(peer_num);
for _ in 0..peer_num {
let mut buf = [0u8; 8];
bytes.read_exact(&mut buf).unwrap();
peers.push(PeerID::from_le_bytes(buf));
}
let mut ans = MovableListState::new(idx);
let iters = serde_columnar::iter_from_bytes::<EncodedFastSnapshot>(bytes).unwrap();
let mut elem_iter = iters.elem_ids;
let item_iter = iters.items;
let mut list_item_id_iter = iters.list_item_ids;
let mut last_set_id_iter = iters.last_set_ids;
let mut is_first = true;
let list_value = list_value.into_list().unwrap();
let mut list_value_iter = list_value.iter();
for item in item_iter {
let EncodedItemForFastSnapshot {
invisible_list_item,
pos_id_eq_elem_id,
elem_id_eq_last_set_id,
} = item.unwrap();
if !is_first {
let EncodedIdFull {
peer_idx,
counter,
lamport,
} = list_item_id_iter.next().unwrap().unwrap();
let id_full = IdFull::new(peers[peer_idx], counter, lamport);
let elem_id = if pos_id_eq_elem_id {
id_full.idlp()
} else {
let EncodedId { peer_idx, lamport } = elem_iter.next().unwrap().unwrap();
IdLp::new(peers[peer_idx], lamport)
};
let last_set_id = if elem_id_eq_last_set_id {
elem_id
} else {
let EncodedId { peer_idx, lamport } =
last_set_id_iter.next().unwrap().unwrap();
IdLp::new(peers[peer_idx], lamport)
};
let value = list_value_iter.next().unwrap();
ans.inner.push_inner(
id_full,
Some(PushElemInfo {
elem_id: elem_id.compact(),
value: value.clone(),
last_set_id,
}),
)
}
is_first = false;
for _ in 0..invisible_list_item {
let EncodedIdFull {
peer_idx,
counter,
lamport,
} = list_item_id_iter.next().unwrap().unwrap();
let id_full = IdFull::new(peers[peer_idx], counter, lamport);
ans.inner.push_inner(id_full, None);
}
}
debug_assert!(elem_iter.next().is_none());
debug_assert!(list_item_id_iter.next().is_none());
debug_assert!(last_set_id_iter.next().is_none());
debug_assert!(list_value_iter.next().is_none());
Ok(ans)
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use loro_common::{CompactIdLp, ContainerID, LoroValue, ID};
use crate::container::idx::ContainerIdx;
use super::*;
#[test]
fn test_movable_list_snapshot() {
let mut list = MovableListState::new(ContainerIdx::from_index_and_type(
0,
loro_common::ContainerType::MovableList,
));
list.inner.insert_list_item(0, IdFull::new(9, 9, 9));
list.inner.insert_list_item(1, IdFull::new(0, 0, 0));
let _ = list.create_new_elem(
CompactIdLp::new(10, 10),
IdLp {
lamport: 0,
peer: 0,
},
LoroValue::Container(ContainerID::new_normal(
ID::new(10, 10),
loro_common::ContainerType::Text,
)),
IdLp {
lamport: 1,
peer: 2,
},
);
list.inner.insert_list_item(2, IdFull::new(1, 1, 1));
list.inner.insert_list_item(3, IdFull::new(2, 2, 2));
list.inner.insert_list_item(4, IdFull::new(3, 3, 3));
let _ = list.create_new_elem(
CompactIdLp::new(3, 8),
IdLp {
lamport: 3,
peer: 3,
},
LoroValue::String(Arc::new("abc".to_string())),
IdLp {
lamport: 4,
peer: 5,
},
);
let mut bytes = Vec::new();
list.encode_snapshot_fast(&mut bytes);
let (v, bytes) = MovableListState::decode_value(&bytes).unwrap();
assert_eq!(
v,
vec![
LoroValue::Container(ContainerID::new_normal(
ID::new(10, 10),
loro_common::ContainerType::Text,
)),
LoroValue::String(Arc::new("abc".to_string())),
]
.into()
);
let mut list2 = MovableListState::decode_snapshot_fast(
ContainerIdx::from_index_and_type(0, loro_common::ContainerType::MovableList),
(v.clone(), bytes),
)
.unwrap();
assert_eq!(&list2.get_value(), &v);
assert_eq!(&list.inner, &list2.inner);
}
}
}
#[cfg(test)]
mod test {
use crate::{HandlerTrait, LoroDoc, ToJson};