fix: remove counter & lamport from snapshot

This commit is contained in:
leeeon233 2023-02-17 09:51:29 +08:00 committed by Leonzhao
parent ea921e4c8f
commit ccfa3ee63d
2 changed files with 107 additions and 86 deletions

View file

@ -33,10 +33,10 @@ type Containers = Vec<ContainerID>;
pub(super) struct ChangeEncoding {
#[columnar(strategy = "Rle", original_type = "u32")]
pub(super) client_idx: ClientIdx,
#[columnar(strategy = "DeltaRle", original_type = "i32")]
pub(super) counter: Counter,
#[columnar(strategy = "DeltaRle", original_type = "u32")]
pub(super) lamport: Lamport,
// #[columnar(strategy = "DeltaRle", original_type = "i32")]
// pub(super) counter: Counter,
// #[columnar(strategy = "DeltaRle", original_type = "u32")]
// pub(super) lamport: Lamport,
#[columnar(strategy = "DeltaRle", original_type = "i64")]
pub(super) timestamp: Timestamp,
pub(super) op_len: u32,
@ -197,8 +197,8 @@ pub(super) fn encode_changes(store: &LogStore, vv: &VersionVector) -> Result<Vec
changes.push(ChangeEncoding {
client_idx: client_idx as ClientIdx,
counter: change.id.counter,
lamport: change.lamport,
// counter: change.id.counter,
// lamport: change.lamport,
timestamp: change.timestamp,
deps_len: change.deps.len() as u32,
op_len,
@ -259,8 +259,8 @@ pub(super) fn decode_changes_to_inner_format(
for change_encoding in this_change_encodings {
let ChangeEncoding {
client_idx,
counter: this_counter,
lamport: this_lamport,
// counter: this_counter,
// lamport: this_lamport,
timestamp,
op_len,
deps_len,

View file

@ -1,11 +1,12 @@
use fxhash::FxHashMap;
use itertools::Itertools;
use rle::{HasLength, RleVec, RleVecWithIndex};
use serde::{Deserialize, Serialize};
use serde_columnar::{columnar, from_bytes, to_vec};
use smallvec::smallvec;
use crate::{
change::{Change, ChangeMergeCfg},
change::{Change, ChangeMergeCfg, Lamport},
container::text::text_content::SliceRange,
container::{
list::list_op::{DeleteSpan, InnerListOp},
@ -17,7 +18,7 @@ use crate::{
dag::remove_included_frontiers,
event::RawEvent,
hierarchy::Hierarchy,
id::{ClientID, ID},
id::{ClientID, Counter, ID},
log_store::ImportContext,
op::{InnerContent, Op},
span::{HasIdSpan, HasLamportSpan},
@ -143,6 +144,8 @@ pub(super) struct SnapshotEncoded {
containers: Containers,
container_states: Vec<EncodedStateContent>,
keys: Vec<InternalString>,
start_counter: Vec<Counter>,
start_lamport: Vec<Lamport>,
}
const ENCODED_UNKNOWN_SLICE: i64 = -2;
@ -192,10 +195,15 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> Result<Vec<u8>, Lor
let mut client_id_to_idx: FxHashMap<ClientID, ClientIdx> = FxHashMap::default();
let mut clients = Vec::with_capacity(store.changes.len());
let mut change_num = 0;
let mut start_counter = Vec::new();
let mut start_lamport = Vec::new();
for (key, changes) in store.changes.iter() {
client_id_to_idx.insert(*key, clients.len() as ClientIdx);
clients.push(*key);
change_num += changes.merged_len();
start_counter.push(changes.first().unwrap().id.counter);
start_lamport.push(changes.first().unwrap().lamport);
}
let (_, containers) = store.reg.export();
@ -248,8 +256,8 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> Result<Vec<u8>, Lor
changes.push(ChangeEncoding {
client_idx: client_idx as ClientIdx,
counter: change.id.counter,
lamport: change.lamport,
// counter: change.id.counter,
// lamport: change.lamport,
timestamp: change.timestamp,
deps_len: change.deps.len() as u32,
op_len: op_len as u32,
@ -279,6 +287,8 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> Result<Vec<u8>, Lor
containers,
container_states,
keys,
start_counter,
start_lamport,
};
to_vec(&encoded).map_err(|e| LoroError::DecodeError(e.to_string().into()))
}
@ -298,6 +308,8 @@ pub(super) fn decode_snapshot(
containers,
container_states,
keys,
start_counter,
start_lamport,
} = encoded;
if change_encodings.is_empty() {
@ -320,89 +332,98 @@ pub(super) fn decode_snapshot(
container_idx2type.insert(container_idx, container_id.container_type());
}
for change_encoding in change_encodings {
let ChangeEncoding {
client_idx,
counter,
lamport,
timestamp,
op_len,
deps_len,
} = change_encoding;
for (client_idx, this_change_encodings) in
&change_encodings.into_iter().group_by(|c| c.client_idx)
{
let mut counter = start_counter[client_idx as usize];
let mut lamport = start_lamport[client_idx as usize];
for change_encoding in this_change_encodings {
let ChangeEncoding {
client_idx,
// counter: this_counter,
// lamport: this_lamport,
timestamp,
op_len,
deps_len,
} = change_encoding;
let client_id = clients[client_idx as usize];
let mut ops = RleVec::<[Op; 2]>::new();
let deps = (0..deps_len)
.map(|_| {
let raw = deps_iter.next().unwrap();
ID::new(clients[raw.client_idx as usize], raw.counter)
})
.collect();
let client_id = clients[client_idx as usize];
let mut ops = RleVec::<[Op; 2]>::new();
let deps = (0..deps_len)
.map(|_| {
let raw = deps_iter.next().unwrap();
ID::new(clients[raw.client_idx as usize], raw.counter)
})
.collect();
let mut op_counter = counter;
for op in op_iter.by_ref().take(op_len as usize) {
let SnapshotOpEncoding {
container: container_idx,
prop,
value,
value2,
} = op;
let mut delta = 0;
for op in op_iter.by_ref().take(op_len as usize) {
let SnapshotOpEncoding {
container: container_idx,
prop,
value,
value2,
} = op;
let container_idx = ContainerIdx::from_u32(container_idx);
let container_type = container_idx2type[&container_idx];
let content = match container_type {
ContainerType::Map => {
let key = keys[prop].clone();
InnerContent::Map(InnerMapSet {
key,
value: value as u32,
})
}
ContainerType::List | ContainerType::Text => {
let is_del = value2 == ENCODED_DELETED_CONTENT;
let list_op = if is_del {
InnerListOp::Delete(DeleteSpan {
pos: prop as isize,
len: value as isize,
let container_idx = ContainerIdx::from_u32(container_idx);
let container_type = container_idx2type[&container_idx];
let content = match container_type {
ContainerType::Map => {
let key = keys[prop].clone();
InnerContent::Map(InnerMapSet {
key,
value: value as u32,
})
} else {
let is_unknown = value2 == ENCODED_UNKNOWN_SLICE;
if is_unknown {
InnerListOp::Insert {
slice: SliceRange::new_unknown(value as u32),
pos: prop,
}
}
ContainerType::List | ContainerType::Text => {
let is_del = value2 == ENCODED_DELETED_CONTENT;
let list_op = if is_del {
InnerListOp::Delete(DeleteSpan {
pos: prop as isize,
len: value as isize,
})
} else {
InnerListOp::Insert {
slice: (value as u32..(value as i64 + value2) as u32).into(),
pos: prop,
let is_unknown = value2 == ENCODED_UNKNOWN_SLICE;
if is_unknown {
InnerListOp::Insert {
slice: SliceRange::new_unknown(value as u32),
pos: prop,
}
} else {
InnerListOp::Insert {
slice: (value as u32..(value as i64 + value2) as u32).into(),
pos: prop,
}
}
}
};
InnerContent::List(list_op)
}
};
let op = Op {
counter: op_counter,
container: container_idx,
content,
};
InnerContent::List(list_op)
}
};
let op = Op {
counter: counter + delta,
container: container_idx,
content,
};
delta += op.content_len() as i32;
ops.push(op);
}
let change = Change {
id: ID { client_id, counter },
lamport,
timestamp,
ops,
deps,
};
op_counter += op.content_len() as i32;
ops.push(op);
counter += delta;
lamport += delta as u32;
changes
.entry(client_id)
.or_insert_with(|| RleVecWithIndex::new_with_conf(ChangeMergeCfg::new()))
.push(change);
}
let change = Change {
id: ID { client_id, counter },
lamport,
timestamp,
ops,
deps,
};
changes
.entry(client_id)
.or_insert_with(|| RleVecWithIndex::new_with_conf(ChangeMergeCfg::new()))
.push(change);
}
let vv: VersionVector = changes