From ccfa3ee63d2e061856ed98ad33653fddc631dcef Mon Sep 17 00:00:00 2001 From: leeeon233 Date: Fri, 17 Feb 2023 09:51:29 +0800 Subject: [PATCH] fix: remove counter & lamport from snapshot --- .../src/log_store/encoding/encode_changes.rs | 16 +- .../src/log_store/encoding/encode_snapshot.rs | 177 ++++++++++-------- 2 files changed, 107 insertions(+), 86 deletions(-) diff --git a/crates/loro-internal/src/log_store/encoding/encode_changes.rs b/crates/loro-internal/src/log_store/encoding/encode_changes.rs index 5322b3e2..5db1990f 100644 --- a/crates/loro-internal/src/log_store/encoding/encode_changes.rs +++ b/crates/loro-internal/src/log_store/encoding/encode_changes.rs @@ -33,10 +33,10 @@ type Containers = Vec; pub(super) struct ChangeEncoding { #[columnar(strategy = "Rle", original_type = "u32")] pub(super) client_idx: ClientIdx, - #[columnar(strategy = "DeltaRle", original_type = "i32")] - pub(super) counter: Counter, - #[columnar(strategy = "DeltaRle", original_type = "u32")] - pub(super) lamport: Lamport, + // #[columnar(strategy = "DeltaRle", original_type = "i32")] + // pub(super) counter: Counter, + // #[columnar(strategy = "DeltaRle", original_type = "u32")] + // pub(super) lamport: Lamport, #[columnar(strategy = "DeltaRle", original_type = "i64")] pub(super) timestamp: Timestamp, pub(super) op_len: u32, @@ -197,8 +197,8 @@ pub(super) fn encode_changes(store: &LogStore, vv: &VersionVector) -> Result, keys: Vec, + start_counter: Vec, + start_lamport: Vec, } const ENCODED_UNKNOWN_SLICE: i64 = -2; @@ -192,10 +195,15 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> Result, Lor let mut client_id_to_idx: FxHashMap = FxHashMap::default(); let mut clients = Vec::with_capacity(store.changes.len()); let mut change_num = 0; + let mut start_counter = Vec::new(); + let mut start_lamport = Vec::new(); for (key, changes) in store.changes.iter() { client_id_to_idx.insert(*key, clients.len() as ClientIdx); clients.push(*key); change_num += changes.merged_len(); + + start_counter.push(changes.first().unwrap().id.counter); + start_lamport.push(changes.first().unwrap().lamport); } let (_, containers) = store.reg.export(); @@ -248,8 +256,8 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> Result, Lor changes.push(ChangeEncoding { client_idx: client_idx as ClientIdx, - counter: change.id.counter, - lamport: change.lamport, + // counter: change.id.counter, + // lamport: change.lamport, timestamp: change.timestamp, deps_len: change.deps.len() as u32, op_len: op_len as u32, @@ -279,6 +287,8 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> Result, Lor containers, container_states, keys, + start_counter, + start_lamport, }; to_vec(&encoded).map_err(|e| LoroError::DecodeError(e.to_string().into())) } @@ -298,6 +308,8 @@ pub(super) fn decode_snapshot( containers, container_states, keys, + start_counter, + start_lamport, } = encoded; if change_encodings.is_empty() { @@ -320,89 +332,98 @@ pub(super) fn decode_snapshot( container_idx2type.insert(container_idx, container_id.container_type()); } - for change_encoding in change_encodings { - let ChangeEncoding { - client_idx, - counter, - lamport, - timestamp, - op_len, - deps_len, - } = change_encoding; + for (client_idx, this_change_encodings) in + &change_encodings.into_iter().group_by(|c| c.client_idx) + { + let mut counter = start_counter[client_idx as usize]; + let mut lamport = start_lamport[client_idx as usize]; + for change_encoding in this_change_encodings { + let ChangeEncoding { + client_idx, + // counter: this_counter, + // lamport: this_lamport, + timestamp, + op_len, + deps_len, + } = change_encoding; - let client_id = clients[client_idx as usize]; - let mut ops = RleVec::<[Op; 2]>::new(); - let deps = (0..deps_len) - .map(|_| { - let raw = deps_iter.next().unwrap(); - ID::new(clients[raw.client_idx as usize], raw.counter) - }) - .collect(); + let client_id = clients[client_idx as usize]; + let mut ops = RleVec::<[Op; 2]>::new(); + let deps = (0..deps_len) + .map(|_| { + let raw = deps_iter.next().unwrap(); + ID::new(clients[raw.client_idx as usize], raw.counter) + }) + .collect(); - let mut op_counter = counter; - for op in op_iter.by_ref().take(op_len as usize) { - let SnapshotOpEncoding { - container: container_idx, - prop, - value, - value2, - } = op; + let mut delta = 0; + for op in op_iter.by_ref().take(op_len as usize) { + let SnapshotOpEncoding { + container: container_idx, + prop, + value, + value2, + } = op; - let container_idx = ContainerIdx::from_u32(container_idx); - let container_type = container_idx2type[&container_idx]; - let content = match container_type { - ContainerType::Map => { - let key = keys[prop].clone(); - InnerContent::Map(InnerMapSet { - key, - value: value as u32, - }) - } - ContainerType::List | ContainerType::Text => { - let is_del = value2 == ENCODED_DELETED_CONTENT; - let list_op = if is_del { - InnerListOp::Delete(DeleteSpan { - pos: prop as isize, - len: value as isize, + let container_idx = ContainerIdx::from_u32(container_idx); + let container_type = container_idx2type[&container_idx]; + let content = match container_type { + ContainerType::Map => { + let key = keys[prop].clone(); + InnerContent::Map(InnerMapSet { + key, + value: value as u32, }) - } else { - let is_unknown = value2 == ENCODED_UNKNOWN_SLICE; - if is_unknown { - InnerListOp::Insert { - slice: SliceRange::new_unknown(value as u32), - pos: prop, - } + } + ContainerType::List | ContainerType::Text => { + let is_del = value2 == ENCODED_DELETED_CONTENT; + let list_op = if is_del { + InnerListOp::Delete(DeleteSpan { + pos: prop as isize, + len: value as isize, + }) } else { - InnerListOp::Insert { - slice: (value as u32..(value as i64 + value2) as u32).into(), - pos: prop, + let is_unknown = value2 == ENCODED_UNKNOWN_SLICE; + if is_unknown { + InnerListOp::Insert { + slice: SliceRange::new_unknown(value as u32), + pos: prop, + } + } else { + InnerListOp::Insert { + slice: (value as u32..(value as i64 + value2) as u32).into(), + pos: prop, + } } - } - }; - InnerContent::List(list_op) - } - }; - let op = Op { - counter: op_counter, - container: container_idx, - content, + }; + InnerContent::List(list_op) + } + }; + let op = Op { + counter: counter + delta, + container: container_idx, + content, + }; + delta += op.content_len() as i32; + ops.push(op); + } + + let change = Change { + id: ID { client_id, counter }, + lamport, + timestamp, + ops, + deps, }; - op_counter += op.content_len() as i32; - ops.push(op); + counter += delta; + lamport += delta as u32; + + changes + .entry(client_id) + .or_insert_with(|| RleVecWithIndex::new_with_conf(ChangeMergeCfg::new())) + .push(change); } - - let change = Change { - id: ID { client_id, counter }, - lamport, - timestamp, - ops, - deps, - }; - changes - .entry(client_id) - .or_insert_with(|| RleVecWithIndex::new_with_conf(ChangeMergeCfg::new())) - .push(change); } let vv: VersionVector = changes