chore: check bug

This commit is contained in:
leeeon233 2022-12-12 16:27:08 +08:00
parent 9b74125ba5
commit 972814aec5
3 changed files with 163 additions and 24 deletions

View file

@ -61,8 +61,8 @@ impl<O> HasLamport for Change<O> {
self.lamport
}
}
impl<O: Mergable + HasLength + HasIndex> HasLength for Change<O> {
use std::fmt::Debug;
impl<O: Mergable + HasLength + HasIndex + Debug> HasLength for Change<O> {
fn content_len(&self) -> usize {
self.ops.span().as_()
}

View file

@ -194,6 +194,15 @@ fn encode_changes(store: &LogStore, vv: &VersionVector) -> Encoded {
});
}
// println!("changes: {:?} bytes\nops: {:?} bytes\ndeps: {:?} bytes\nclients: {:?} bytes\ncontainers: {:?} bytes\nkeys: {:?} bytes\n",
// to_vec(&changes).unwrap().len(),
// to_vec(&ops).unwrap().len(),
// to_vec(&deps).unwrap().len(),
// to_vec(&clients).unwrap().len(),
// to_vec(&container_ids).unwrap().len(),
// to_vec(&keys).unwrap().len(),
// );
Encoded {
changes,
ops,

View file

@ -1,7 +1,9 @@
use fxhash::FxHashMap;
use rle::{HasLength, RleVec, RleVecWithIndex};
use serde::{Deserialize, Serialize};
use serde_columnar::{columnar, compress, decompress, from_bytes, to_vec, CompressConfig};
use serde_columnar::{
columnar, compress, decompress, from_bytes, to_vec, ColumnarVec, CompressConfig,
};
use crate::{
change::{Change, ChangeMergeCfg},
@ -16,6 +18,7 @@ use crate::{
id::{ClientID, ID},
op::{InnerContent, Op},
span::{HasIdSpan, HasLamportSpan},
text::text_content::SliceRange,
version::TotalOrderStamp,
ContainerType, InternalString, LogStore, LoroValue, VersionVector,
};
@ -119,7 +122,7 @@ impl EncodedStateContent {
}
}
#[columnar(vec, ser, de)]
#[columnar(ser, de)]
#[derive(Debug, Clone, Serialize, Deserialize)]
struct SnapshotOpEncoding {
#[columnar(strategy = "Rle", original_type = "u32")]
@ -129,11 +132,112 @@ struct SnapshotOpEncoding {
prop: usize,
// #[columnar(compress(level = 0))]
// list range or del len or map value index
value: u64,
#[columnar(strategy = "BoolRle")]
is_del: bool,
value: u32,
#[columnar(strategy = "Rle")]
value2: i64,
}
const _: () = {
use serde::ser::SerializeTuple;
#[automatically_derived]
impl<IT> ::serde_columnar::RowSer<IT> for SnapshotOpEncoding
where
for<'c> &'c IT: IntoIterator<Item = &'c Self>,
{
const FIELD_NUM: usize = 4usize;
fn serialize_columns<S>(rows: &IT, ser: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
let column0 = rows
.into_iter()
.map(|row| row.container)
.collect::<::std::vec::Vec<_>>();
let column0 = ::serde_columnar::RleColumn::<u32>::new(
column0,
::serde_columnar::ColumnAttr {
index: 0usize,
compress: None,
},
);
let column1 = rows
.into_iter()
.map(|row| row.prop)
.collect::<::std::vec::Vec<_>>();
let column1 = ::serde_columnar::DeltaRleColumn::<usize>::new(
column1,
::serde_columnar::ColumnAttr {
index: 1usize,
compress: None,
},
);
let column2 = rows
.into_iter()
.map(|row| row.value)
.collect::<::std::vec::Vec<_>>();
let column3 = rows
.into_iter()
.map(|row| row.value2)
.collect::<::std::vec::Vec<_>>();
let column3 = ::serde_columnar::RleColumn::new(
column3,
::serde_columnar::ColumnAttr {
index: 3usize,
compress: None,
},
);
println!(
"c {} p {} v {} d {}",
to_vec(&column0).unwrap().len(),
to_vec(&column1).unwrap().len(),
to_vec(&column2).unwrap().len(),
to_vec(&column3).unwrap().len()
);
let mut seq_encoder = ser.serialize_tuple(4usize)?;
seq_encoder.serialize_element(&column0)?;
seq_encoder.serialize_element(&column1)?;
seq_encoder.serialize_element(&column2)?;
seq_encoder.serialize_element(&column3)?;
seq_encoder.end()
}
}
};
const _: () = {
use serde::ser::SerializeTuple;
#[automatically_derived]
impl<'de, IT> ::serde_columnar::RowDe<'de, IT> for SnapshotOpEncoding
where
IT: FromIterator<Self> + Clone,
{
const FIELD_NUM: usize = 4usize;
fn deserialize_columns<D>(de: D) -> Result<IT, D::Error>
where
D: serde::Deserializer<'de>,
{
let (column0, column1, column2, column3): (
::serde_columnar::RleColumn<u32>,
::serde_columnar::DeltaRleColumn<usize>,
::std::vec::Vec<u32>,
::serde_columnar::RleColumn<i64>,
) = serde::de::Deserialize::deserialize(de)?;
let ans = ::serde_columnar::izip!(
column0.data.into_iter(),
column1.data.into_iter(),
column2.into_iter(),
column3.data.into_iter()
)
.map(|(container, prop, value, is_del)| Self {
container: container,
prop: prop,
value: value,
value2: is_del,
})
.collect();
Ok(ans)
}
}
};
#[columnar(ser, de)]
#[derive(Debug, Serialize, Deserialize)]
pub(super) struct SnapshotEncoded {
@ -153,13 +257,24 @@ fn convert_inner_content(
op_content: &InnerContent,
key_to_idx: &mut FxHashMap<InternalString, usize>,
keys: &mut Vec<InternalString>,
) -> (usize, u64, bool) {
) -> (usize, u32, i64) {
let (prop, value, is_del) = match &op_content {
InnerContent::List(list_op) => match list_op {
InnerListOp::Insert { slice, pos } => {
(*pos, merge_2_u32_u64(slice.0.start, slice.0.end), false)
if slice.is_unknown() {
(*pos, slice.content_len() as u32, -2)
} else {
if (slice.0.end as i64) < 0 {
println!("GG");
}
(
*pos,
slice.0.start,
slice.0.end as i64, //merge_2_u32_u64(slice.0.start, slice.0.end),
)
}
}
InnerListOp::Delete(span) => (span.pos as usize, span.len as u64, true),
InnerListOp::Delete(span) => (span.pos as usize, span.len as u32, -1),
},
InnerContent::Map(map_set) => {
let InnerMapSet { key, value } = map_set;
@ -168,8 +283,8 @@ fn convert_inner_content(
keys.push(key.clone());
keys.len() - 1
}),
*value as u64,
false,
*value,
-1,
)
}
};
@ -215,13 +330,13 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> SnapshotEncoded {
.unwrap()
.to_export_snapshot(&op.content, gc);
for op_content in new_ops {
let (prop, value, is_del) =
let (prop, value, value2) =
convert_inner_content(&op_content, &mut key_to_idx, &mut keys);
ops.push(SnapshotOpEncoding {
container: container_idx.to_u32(),
prop,
value,
is_del,
value2,
});
}
}
@ -249,6 +364,16 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> SnapshotEncoded {
})
.collect();
// println!("changes: {:?} bytes\nops: {:?} bytes\ndeps: {:?} bytes\nclients: {:?} bytes\ncontainers: {:?} bytes\ncontainer states: {:?} bytes\nkeys: {:?} bytes\n",
// to_vec(&ColumnarVec::new(&changes)).unwrap().len(),
// to_vec(&ColumnarVec::new(&ops)).unwrap().len(),
// to_vec(&ColumnarVec::new(&deps)).unwrap().len(),
// to_vec(&clients).unwrap().len(),
// to_vec(&containers).unwrap().len(),
// to_vec(&container_states).unwrap().len(),
// to_vec(&keys).unwrap().len(),
// );
SnapshotEncoded {
changes,
ops,
@ -316,29 +441,35 @@ pub(super) fn decode_snapshot(store: &mut LogStore, encoded: SnapshotEncoded) {
container: container_idx,
prop,
value,
is_del,
value2,
} = op;
let container_idx = ContainerIdx::from_u32(container_idx);
let container = store.reg.get_by_idx(container_idx).unwrap();
let content = match container.lock().unwrap().type_() {
ContainerType::Map => {
let key = keys[prop].clone();
InnerContent::Map(InnerMapSet {
key,
value: value as u32,
})
InnerContent::Map(InnerMapSet { key, value })
}
ContainerType::List | ContainerType::Text => {
let is_del = value2 == -1;
let list_op = if is_del {
InnerListOp::Delete(DeleteSpan {
pos: prop as isize,
len: value as isize,
})
} else {
let (start, end) = split_u64_2_u32(value);
InnerListOp::Insert {
slice: (start..end).into(),
pos: prop,
let is_unknown = value2 == -2;
if is_unknown {
InnerListOp::Insert {
slice: SliceRange::new_unknown(value),
pos: prop,
}
} else {
InnerListOp::Insert {
slice: (value as u32..value2 as u32).into(),
pos: prop,
}
}
};
InnerContent::List(list_op)
@ -361,7 +492,6 @@ pub(super) fn decode_snapshot(store: &mut LogStore, encoded: SnapshotEncoded) {
ops,
deps,
};
changes
.entry(client_id)
.or_insert_with(|| RleVecWithIndex::new_with_conf(ChangeMergeCfg::new()))