feat: block encode header (#458)

* bk

* feat: encode header

* chore: use columnar 0.3.9

* fix: not need encode last lamport and length

* chore: cargo fix

* fix: print

* bk

* chore: columnar 0.3.10
This commit is contained in:
Leon Zhao 2024-09-11 22:53:10 +08:00 committed by GitHub
parent 71e46f65f1
commit dfc99c1746
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 255 additions and 248 deletions

24
Cargo.lock generated
View file

@ -491,9 +491,9 @@ dependencies = [
[[package]]
name = "darling"
version = "0.20.3"
version = "0.20.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0209d94da627ab5605dcccf08bb18afa5009cfbef48d8a8b7d7bdbc79be25c5e"
checksum = "6f63b86c8a8826a49b8c21f08a2d07338eec8d900540f8630dc76284be802989"
dependencies = [
"darling_core",
"darling_macro",
@ -501,9 +501,9 @@ dependencies = [
[[package]]
name = "darling_core"
version = "0.20.3"
version = "0.20.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "177e3443818124b357d8e76f53be906d60937f0d3a90773a664fa63fa253e621"
checksum = "95133861a8032aaea082871032f5815eb9e98cef03fa916ab4500513994df9e5"
dependencies = [
"fnv",
"ident_case",
@ -515,9 +515,9 @@ dependencies = [
[[package]]
name = "darling_macro"
version = "0.20.3"
version = "0.20.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5"
checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806"
dependencies = [
"darling_core",
"quote 1.0.35",
@ -2024,9 +2024,9 @@ dependencies = [
[[package]]
name = "serde_columnar"
version = "0.3.7"
version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3748cbf2b43a15ee9627881cabd7820d50508781cdebd3bb54cea49215d367a1"
checksum = "6d4e3c0e46450edf7da174b610b9143eb8ca22059ace5016741fc9e20b88d1e7"
dependencies = [
"itertools 0.11.0",
"postcard",
@ -2037,9 +2037,9 @@ dependencies = [
[[package]]
name = "serde_columnar_derive"
version = "0.3.4"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e5eaacabbc55a397ffbb1ee32523f40f86fdefea8a8d9db19630d8b7c00edd1"
checksum = "42c5d47942b2a7e76118b697fc0f94516a5d8366a3c0fee8d0e2b713e952e306"
dependencies = [
"darling",
"proc-macro2 1.0.75",
@ -2151,9 +2151,9 @@ dependencies = [
[[package]]
name = "strsim"
version = "0.10.0"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]]
name = "syn"

View file

@ -20,7 +20,7 @@ enum_dispatch = "0.3.11"
enum-as-inner = "0.5.1"
fxhash = "0.2.1"
tracing = { version = "0.1" }
serde_columnar = { version = "0.3.7" }
serde_columnar = { version = "0.3.10" }
serde_json = "1.0"
thiserror = "1"
smallvec = { version = "1.8.0", features = ["serde"] }

View file

@ -1,4 +1,3 @@
use std::time::Instant;
use dev_utils::{get_mem_usage, ByteSize};
use loro::{CommitOptions, LoroCounter, LoroDoc, LoroMap};

View file

@ -1,6 +1,6 @@
use dev_utils::ByteSize;
use loro::LoroDoc;
use std::time::{Duration, Instant};
use std::time::Instant;
pub fn bench_fast_snapshot(doc: &LoroDoc) {
let old_v;

View file

@ -10,7 +10,6 @@ use loro::{
event::Diff, Container, ContainerID, ContainerType, LoroDoc, LoroError, LoroTree, LoroValue,
TreeExternalDiff, TreeID,
};
use tracing::{debug, trace};
use crate::{
actions::{Actionable, FromGenericAction, GenericAction},

View file

@ -1,4 +1,4 @@
use std::{fmt::Display, io::Write, str::Bytes, sync::Arc};
use std::{fmt::Display, io::Write, sync::Arc};
use arbitrary::Arbitrary;
use enum_as_inner::EnumAsInner;

View file

@ -6,7 +6,7 @@ use generic_btree::{
};
use loro_common::{Counter, HasId, HasIdSpan, IdFull, IdSpan, Lamport, PeerID, ID};
use rle::HasLength as _;
use tracing::{instrument, trace};
use tracing::instrument;
use crate::{cursor::AbsolutePosition, VersionVector};

View file

@ -12,7 +12,6 @@ use enum_as_inner::EnumAsInner;
use loro_common::{CompactIdLp, ContainerType, CounterSpan, IdFull, IdLp, IdSpan};
use rle::{HasIndex, HasLength, Mergable, Sliceable};
use serde::{ser::SerializeSeq, Deserialize, Serialize};
use smallvec::SmallVec;
use std::{borrow::Cow, ops::Range};
mod content;

View file

@ -8,7 +8,7 @@ use std::cell::RefCell;
use std::cmp::Ordering;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use tracing::{debug, instrument, trace, trace_span};
use tracing::{debug, trace, trace_span};
use self::change_store::iter::MergedChangeIter;
use self::pending_changes::PendingChanges;
@ -559,6 +559,7 @@ impl OpLog {
#[inline(never)]
pub(crate) fn idlp_to_id(&self, id: loro_common::IdLp) -> Option<ID> {
let change = self.change_store.get_change_by_lamport_lte(id)?;
if change.lamport > id.lamport || change.lamport_end() <= id.lamport {
return None;
}

View file

@ -1,5 +1,5 @@
use self::block_encode::{decode_block, decode_header, encode_block, ChangesBlockHeader};
use super::{loro_dag::AppDagNodeInner, AppDag, AppDagNode};
use super::{loro_dag::AppDagNodeInner, AppDagNode};
use crate::{
arena::SharedArena,
change::{Change, Timestamp},
@ -7,12 +7,12 @@ use crate::{
kv_store::KvStore,
op::Op,
parent::register_container_and_parent_link,
version::{shrink_frontiers, Frontiers, ImVersionVector},
version::{Frontiers, ImVersionVector},
VersionVector,
};
use block_encode::decode_block_range;
use bytes::Bytes;
use fxhash::{FxHashMap, FxHashSet};
use fxhash::FxHashMap;
use itertools::Itertools;
use loro_common::{
Counter, HasCounterSpan, HasId, HasIdSpan, HasLamportSpan, IdLp, IdSpan, Lamport, LoroError,
@ -22,7 +22,6 @@ use loro_kv_store::{mem_store::MemKvConfig, MemKvStore};
use once_cell::sync::OnceCell;
use rle::{HasLength, Mergable, RlePush, RleVec, Sliceable};
use std::{
borrow::BorrowMut,
cmp::Ordering,
collections::{BTreeMap, VecDeque},
ops::{Bound, Deref},
@ -31,6 +30,7 @@ use std::{
use tracing::{debug, info_span, trace, warn};
mod block_encode;
mod block_meta_encode;
mod delta_rle_encode;
pub(super) mod iter;

View file

@ -24,7 +24,7 @@
//! │ N LEB128 Delta Lamports │◁───┘
//! └──────────────────────────────────────────────────────────────┘
//! ┌──────────────────────────────────────────────────────────────┐
//! │ N LEB128 Delta Rle Timestamps
//! │ N LEB128 DeltaOfDelta Timestamps
//! └──────────────────────────────────────────────────────────────┘
//! ┌────────────────────────────────┬─────────────────────────────┐
//! │ N Rle Commit Msg Lengths │ Commit Messages │
@ -77,10 +77,10 @@ use loro_common::{
use once_cell::sync::OnceCell;
use rle::HasLength;
use serde::{Deserialize, Serialize};
use serde_columnar::{columnar, DeltaRleDecoder, Itertools};
use serde_columnar::{columnar, AnyRleDecoder, DeltaOfDeltaDecoder, Itertools};
use tracing::info;
use super::delta_rle_encode::{UnsignedDeltaDecoder, UnsignedDeltaEncoder};
use super::block_meta_encode::decode_changes_header;
use crate::arena::SharedArena;
use crate::change::{Change, Timestamp};
use crate::container::tree::tree_op;
@ -90,9 +90,6 @@ use crate::encoding::{
self, decode_op, encode_op, get_op_prop, EncodedDeleteStartId, IterableEncodedDeleteStartId,
};
use crate::op::Op;
use serde_columnar::{
AnyRleDecoder, AnyRleEncoder, BoolRleDecoder, BoolRleEncoder, DeltaRleEncoder,
};
#[derive(Debug, Serialize, Deserialize)]
struct EncodedBlock<'a> {
@ -102,25 +99,10 @@ struct EncodedBlock<'a> {
lamport_len: u32,
n_changes: u32,
#[serde(borrow)]
peers: Cow<'a, [u8]>,
header: Cow<'a, [u8]>,
// timestamp and commit messages
#[serde(borrow)]
lengths: Cow<'a, [u8]>,
#[serde(borrow)]
dep_on_self: Cow<'a, [u8]>,
#[serde(borrow)]
dep_len: Cow<'a, [u8]>,
#[serde(borrow)]
dep_peer_idxs: Cow<'a, [u8]>,
#[serde(borrow)]
dep_counters: Cow<'a, [u8]>,
#[serde(borrow)]
lamports: Cow<'a, [u8]>,
#[serde(borrow)]
timestamps: Cow<'a, [u8]>,
#[serde(borrow)]
commit_msg_lengths: Cow<'a, [u8]>,
#[serde(borrow)]
commit_msgs: Cow<'a, [u8]>,
change_meta: Cow<'a, [u8]>,
// ---------------------- Ops ----------------------
#[serde(borrow)]
cids: Cow<'a, [u8]>,
@ -137,22 +119,9 @@ struct EncodedBlock<'a> {
}
fn diagnose_block(block: &EncodedBlock) {
use std::mem;
info!("Diagnosing EncodedBlock:");
info!(" peers: {} bytes", block.peers.len());
info!(" lengths: {} bytes", block.lengths.len());
info!(" dep_on_self: {} bytes", block.dep_on_self.len());
info!(" dep_len: {} bytes", block.dep_len.len());
info!(" dep_peer_idxs: {} bytes", block.dep_peer_idxs.len());
info!(" dep_counters: {} bytes", block.dep_counters.len());
info!(" lamports: {} bytes", block.lamports.len());
info!(" timestamps: {} bytes", block.timestamps.len());
info!(
" commit_msg_lengths: {} bytes",
block.commit_msg_lengths.len()
);
info!(" commit_msgs: {} bytes", block.commit_msgs.len());
info!(" header {} bytes", block.header.len());
info!(" change_meta {} bytes", block.change_meta.len());
info!(" cids: {} bytes", block.cids.len());
info!(" keys: {} bytes", block.keys.len());
info!(" positions: {} bytes", block.positions.len());
@ -175,47 +144,6 @@ pub fn encode_block(block: &[Change], arena: &SharedArena) -> Vec<u8> {
peer_register.register(&peer);
let cid_register: ValueRegister<ContainerID> = ValueRegister::new();
let mut timestamp_encoder = DeltaRleEncoder::new();
let mut lamport_encoder = UnsignedDeltaEncoder::new(block.len() * 2 + 4);
let mut commit_msg_len_encoder = AnyRleEncoder::<u32>::new();
let mut commit_msgs = String::new();
let mut dep_self_encoder = BoolRleEncoder::new();
let mut dep_len_encoder = AnyRleEncoder::<u64>::new();
let mut encoded_deps = EncodedDeps {
peer_idx: AnyRleEncoder::new(),
counter: AnyRleEncoder::new(),
};
for c in block {
timestamp_encoder.append(c.timestamp()).unwrap();
lamport_encoder.push(c.lamport() as u64);
if let Some(msg) = c.commit_msg.as_ref() {
commit_msg_len_encoder.append(msg.len() as u32).unwrap();
commit_msgs.push_str(msg);
} else {
commit_msg_len_encoder.append(0).unwrap();
}
let mut dep_on_self = false;
for dep in c.deps().iter() {
if dep.peer == peer {
dep_on_self = true;
} else {
let peer_idx = peer_register.register(&dep.peer);
encoded_deps.peer_idx.append(peer_idx as u32).unwrap();
encoded_deps.counter.append(dep.counter as u32).unwrap();
}
}
dep_self_encoder.append(dep_on_self).unwrap();
dep_len_encoder
.append(if dep_on_self {
c.deps().len() as u64 - 1
} else {
c.deps().len() as u64
})
.unwrap();
}
let mut encoded_ops = Vec::new();
let mut registers = Registers {
@ -281,16 +209,6 @@ pub fn encode_block(block: &[Change], arena: &SharedArena) -> Vec<u8> {
// Write to output
// PeerIDs
let peers = registers.peer_register.unwrap_vec();
let peer_bytes: Vec<u8> = peers.iter().flat_map(|p| p.to_le_bytes()).collect();
// First Counter + Change Len
let mut lengths_bytes = Vec::new();
for c in block {
leb128::write::unsigned(&mut lengths_bytes, c.atom_len() as u64).unwrap();
}
// ┌────────────────────┬─────────────────────────────────────────┐
// │ Key Strings Size │ Key Strings │
// └────────────────────┴─────────────────────────────────────────┘
@ -330,6 +248,14 @@ pub fn encode_block(block: &[Change], arena: &SharedArena) -> Vec<u8> {
// │Value Bytes Size│ Value Bytes │
// └────────────────┴─────────────────────────────────────────────┘
// PeerIDs
let mut peer_register = registers.peer_register;
// .unwrap_vec();
// let peer_bytes: Vec<u8> = peers.iter().flat_map(|p| p.to_le_bytes()).collect();
// Change meta
let (header, change_meta) = encode_changes(block, &mut peer_register);
let value_bytes = value_writer.finish();
let out = EncodedBlock {
counter_start: block[0].id.counter as u32,
@ -337,16 +263,8 @@ pub fn encode_block(block: &[Change], arena: &SharedArena) -> Vec<u8> {
lamport_start: block[0].lamport(),
lamport_len: block.last().unwrap().lamport_end() - block[0].lamport(),
n_changes: block.len() as u32,
peers: Cow::Owned(peer_bytes),
lengths: Cow::Owned(lengths_bytes),
dep_on_self: dep_self_encoder.finish().unwrap().into(),
dep_len: dep_len_encoder.finish().unwrap().into(),
dep_peer_idxs: encoded_deps.peer_idx.finish().unwrap().into(),
dep_counters: encoded_deps.counter.finish().unwrap().into(),
lamports: lamport_encoder.finish().0.into(),
timestamps: timestamp_encoder.finish().unwrap().into(),
commit_msg_lengths: commit_msg_len_encoder.finish().unwrap().into(),
commit_msgs: Cow::Owned(commit_msgs.into_bytes()),
header: header.into(),
change_meta: change_meta.into(),
cids: container_arena.encode().into(),
keys: keys_bytes.into(),
positions: position_bytes.into(),
@ -357,8 +275,7 @@ pub fn encode_block(block: &[Change], arena: &SharedArena) -> Vec<u8> {
diagnose_block(&out);
let ans = postcard::to_allocvec(&out).unwrap();
// info!("block size = {}", ans.len());
println!("BLOCK SIZE = {}", ans.len());
info!("block size = {}", ans.len());
ans
}
@ -395,6 +312,7 @@ use crate::encoding::value::{
RawTreeMove, Value, ValueDecodedArenasTrait, ValueEncodeRegister, ValueKind, ValueReader,
ValueWriter,
};
use crate::oplog::change_store::block_meta_encode::encode_changes;
use crate::version::Frontiers;
impl ValueEncodeRegister for Registers {
fn key_mut(&mut self) -> &mut ValueRegister<loro_common::InternalString> {
@ -476,107 +394,22 @@ pub fn decode_header(m_bytes: &[u8]) -> LoroResult<ChangesBlockHeader> {
fn decode_header_from_doc(doc: &EncodedBlock) -> Result<ChangesBlockHeader, LoroError> {
let EncodedBlock {
n_changes,
peers: peers_bytes,
lengths: lengths_bytes,
dep_on_self,
dep_len,
dep_peer_idxs,
dep_counters,
lamports,
header,
counter_len,
counter_start,
lamport_len,
lamport_start,
..
} = doc;
let first_counter = *counter_start as Counter;
let n_changes = *n_changes as usize;
let peer_num = peers_bytes.len() / 8;
let mut peers = Vec::with_capacity(peer_num);
for i in 0..peer_num {
let peer_id =
PeerID::from_le_bytes((&peers_bytes[(8 * i)..(8 * (i + 1))]).try_into().unwrap());
peers.push(peer_id);
}
// ┌───────────────────┬──────────────────────────────────────────┐ │
// │ LEB First Counter │ N LEB128 Change AtomLen │◁───┼───── Important metadata
// └───────────────────┴──────────────────────────────────────────┘ │
let mut lengths = Vec::with_capacity(n_changes);
let mut lengths_bytes: &[u8] = lengths_bytes;
for _ in 0..n_changes {
lengths.push(leb128::read::unsigned(&mut lengths_bytes).unwrap() as Counter);
}
// ┌───────────────────┬────────────────────────┬─────────────────┐ │
// │N DepOnSelf BoolRle│ N Delta Rle Deps Lens │ N Dep IDs │◁───┘
// └───────────────────┴────────────────────────┴─────────────────┘
let mut dep_self_decoder = BoolRleDecoder::new(dep_on_self);
let mut this_counter = first_counter;
let mut deps: Vec<Frontiers> = Vec::with_capacity(n_changes);
let n = n_changes;
let mut deps_len = AnyRleDecoder::<u64>::new(dep_len);
let deps_peers_decoder = AnyRleDecoder::<u32>::new(dep_peer_idxs);
let deps_counters_decoder = AnyRleDecoder::<u32>::new(dep_counters);
let mut deps_peers_iter = deps_peers_decoder;
let mut deps_counters_iter = deps_counters_decoder;
for i in 0..n {
let mut f = Frontiers::default();
if dep_self_decoder.next().unwrap().unwrap() {
f.push(ID::new(peers[0], this_counter - 1))
}
let len = deps_len.next().unwrap().unwrap() as usize;
for _ in 0..len {
let peer_idx = deps_peers_iter.next().unwrap().unwrap() as usize;
let peer = peers[peer_idx];
let counter = deps_counters_iter.next().unwrap().unwrap() as Counter;
f.push(ID::new(peer, counter));
}
deps.push(f);
this_counter += lengths[i];
}
let mut counters = Vec::with_capacity(n + 1);
let mut last = first_counter;
for i in 0..n {
counters.push(last);
last += lengths[i];
}
counters.push(last);
assert_eq!(last, (counter_start + counter_len) as Counter);
let mut lamport_decoder = UnsignedDeltaDecoder::new(lamports, n_changes);
let mut lamports = Vec::with_capacity(n + 1);
for _ in 0..n {
lamports.push(lamport_decoder.next().unwrap() as Lamport);
}
let last_lamport = *lamports.last().unwrap();
lamports.push(last_lamport + lengths.last().copied().unwrap() as Lamport);
assert_eq!(
*lamports.last().unwrap(),
(lamport_start + lamport_len) as Lamport
let ans: ChangesBlockHeader = decode_changes_header(
&header,
*n_changes as usize,
*counter_start as Counter,
*counter_len as Counter,
*lamport_start,
*lamport_len,
);
Ok(ChangesBlockHeader {
peer: peers[0],
counter: first_counter,
n_changes,
peers,
counters,
deps_groups: deps,
lamports,
keys: OnceCell::new(),
cids: OnceCell::new(),
})
}
struct EncodedDeps {
peer_idx: AnyRleEncoder<u32>,
counter: AnyRleEncoder<u32>,
Ok(ans)
}
#[columnar(vec, ser, de, iterable)]
@ -710,10 +543,8 @@ pub fn decode_block(
});
let EncodedBlock {
n_changes,
timestamps,
counter_start: first_counter,
commit_msg_lengths,
commit_msgs,
change_meta,
cids,
keys,
ops,
@ -722,9 +553,12 @@ pub fn decode_block(
positions,
..
} = doc;
let mut changes = Vec::with_capacity(n_changes as usize);
let mut timestamp_decoder: DeltaRleDecoder<i64> = DeltaRleDecoder::new(&timestamps);
let mut commit_msg_len_decoder = AnyRleDecoder::<u32>::new(&commit_msg_lengths);
let n_changes = n_changes as usize;
let mut changes = Vec::with_capacity(n_changes);
let timestamp_decoder = DeltaOfDeltaDecoder::<i64>::new(&change_meta).unwrap();
let (timestamps, bytes) = timestamp_decoder.take_n_finalize(n_changes).unwrap();
let commit_msg_len_decoder = AnyRleDecoder::<u32>::new(bytes);
let (commit_msg_lens, commit_msgs) = commit_msg_len_decoder.take_n_finalize(n_changes).unwrap();
let mut commit_msg_index = 0;
let keys = header.keys.get_or_init(|| decode_keys(&keys));
let decode_arena = ValueDecodeArena {
@ -755,9 +589,9 @@ pub fn decode_block(
.delete_start_ids
.into_iter()
.map(Ok);
for i in 0..(n_changes as usize) {
for i in 0..n_changes {
let commit_msg: Option<Arc<str>> = {
let len = commit_msg_len_decoder.next().unwrap().unwrap();
let len = commit_msg_lens[i];
if len == 0 {
None
} else {
@ -779,7 +613,7 @@ pub fn decode_block(
deps: header.deps_groups[i].clone(),
id: ID::new(header.peer, header.counters[i]),
lamport: header.lamports[i],
timestamp: timestamp_decoder.next().unwrap().unwrap() as Timestamp,
timestamp: timestamps[i] as Timestamp,
commit_msg,
})
}
@ -826,7 +660,6 @@ pub fn decode_block(
change_index += 1;
}
}
Ok(changes)
}

View file

@ -0,0 +1,186 @@
use loro_common::{Counter, Lamport, PeerID, ID};
use once_cell::sync::OnceCell;
use rle::HasLength;
use serde_columnar::{
AnyRleDecoder, AnyRleEncoder, BoolRleDecoder, BoolRleEncoder, DeltaOfDeltaDecoder,
DeltaOfDeltaEncoder,
};
use crate::{change::Change, encoding::value_register::ValueRegister, version::Frontiers};
use super::block_encode::ChangesBlockHeader;
pub(crate) fn encode_changes(
block: &[Change],
peer_register: &mut ValueRegister<PeerID>,
) -> (Vec<u8>, Vec<u8>) {
let peer = block[0].peer();
let mut timestamp_encoder = DeltaOfDeltaEncoder::new();
let mut lamport_encoder = DeltaOfDeltaEncoder::new();
let mut commit_msg_len_encoder = AnyRleEncoder::<u32>::new();
let mut commit_msgs = String::new();
let mut dep_self_encoder = BoolRleEncoder::new();
let mut dep_len_encoder = AnyRleEncoder::<usize>::new();
let mut encoded_deps = EncodedDeps {
peer_idx: AnyRleEncoder::new(),
counter: DeltaOfDeltaEncoder::new(),
};
// First Counter + Change Len
let mut lengths_bytes = Vec::new();
let mut counter = vec![];
let mut n = 0;
for (i, c) in block.iter().enumerate() {
counter.push(c.id.counter);
let is_last = i == block.len() - 1;
if !is_last {
leb128::write::unsigned(&mut lengths_bytes, c.atom_len() as u64).unwrap();
lamport_encoder.append(c.lamport() as i64).unwrap();
}
timestamp_encoder.append(c.timestamp()).unwrap();
if let Some(msg) = c.commit_msg.as_ref() {
commit_msg_len_encoder.append(msg.len() as u32).unwrap();
commit_msgs.push_str(msg);
} else {
commit_msg_len_encoder.append(0).unwrap();
}
let mut dep_on_self = false;
for dep in c.deps().iter() {
if dep.peer == peer {
dep_on_self = true;
} else {
let peer_idx = peer_register.register(&dep.peer);
encoded_deps.peer_idx.append(peer_idx as u32).unwrap();
encoded_deps.counter.append(dep.counter as i64).unwrap();
n += 1;
}
}
dep_self_encoder.append(dep_on_self).unwrap();
dep_len_encoder
.append(if dep_on_self {
c.deps().len() - 1
} else {
c.deps().len()
})
.unwrap();
}
// TODO: capacity
let mut ans = Vec::with_capacity(block.len() * 15);
let _ = leb128::write::unsigned(&mut ans, peer_register.vec().len() as u64);
ans.extend(peer_register.vec().iter().flat_map(|p| p.to_le_bytes()));
ans.append(&mut lengths_bytes);
ans.append(&mut dep_self_encoder.finish().unwrap());
ans.append(&mut dep_len_encoder.finish().unwrap());
ans.append(&mut encoded_deps.peer_idx.finish().unwrap());
ans.append(&mut encoded_deps.counter.finish().unwrap());
ans.append(&mut lamport_encoder.finish().unwrap());
let mut t = timestamp_encoder.finish().unwrap();
let mut cml = commit_msg_len_encoder.finish().unwrap();
let mut cms = commit_msgs.into_bytes();
let mut meta = Vec::with_capacity(t.len() + cml.len() + cms.len());
meta.append(&mut t);
meta.append(&mut cml);
meta.append(&mut cms);
(ans, meta)
}
pub(crate) fn decode_changes_header(
mut bytes: &[u8],
n_changes: usize,
first_counter: Counter,
counter_len: Counter,
lamport_start: Lamport,
lamport_len: Lamport,
) -> ChangesBlockHeader {
let mut this_counter = first_counter;
let peer_num = leb128::read::unsigned(&mut bytes).unwrap() as usize;
let mut peers = Vec::with_capacity(peer_num);
for i in 0..peer_num {
let peer_id = PeerID::from_le_bytes((&bytes[(8 * i)..(8 * (i + 1))]).try_into().unwrap());
peers.push(peer_id);
}
let mut bytes = &bytes[8 * peer_num..];
// ┌───────────────────┬──────────────────────────────────────────┐ │
// │ LEB First Counter │ N LEB128 Change AtomLen │◁───┼───── Important metadata
// └───────────────────┴──────────────────────────────────────────┘ │
let mut lengths = Vec::with_capacity(n_changes);
for _ in 0..n_changes - 1 {
lengths.push(leb128::read::unsigned(&mut bytes).unwrap() as Counter);
}
lengths.push(counter_len - lengths.iter().sum::<i32>());
// ┌───────────────────┬────────────────────────┬─────────────────┐ │
// │N DepOnSelf BoolRle│ N Delta Rle Deps Lens │ N Dep IDs │◁───┘
// └───────────────────┴────────────────────────┴─────────────────┘
let dep_self_decoder = BoolRleDecoder::new(bytes);
let (dep_self, bytes) = dep_self_decoder.take_n_finalize(n_changes).unwrap();
let dep_len_decoder = AnyRleDecoder::<usize>::new(bytes);
let (deps_len, bytes) = dep_len_decoder.take_n_finalize(n_changes).unwrap();
let other_dep_num = deps_len.iter().sum::<usize>();
let dep_peer_decoder = AnyRleDecoder::<usize>::new(bytes);
let (dep_peers, bytes) = dep_peer_decoder.take_n_finalize(other_dep_num).unwrap();
let mut deps_peers_iter = dep_peers.into_iter();
let dep_counter_decoder = DeltaOfDeltaDecoder::<u32>::new(bytes).unwrap();
let (dep_counters, bytes) = dep_counter_decoder.take_n_finalize(other_dep_num).unwrap();
let mut deps_counters_iter = dep_counters.into_iter();
let mut deps = Vec::with_capacity(n_changes);
for i in 0..n_changes {
let mut f = Frontiers::default();
if dep_self[i] {
f.push(ID::new(peers[0], this_counter - 1))
}
let len = deps_len[i];
for _ in 0..len {
let peer_idx = deps_peers_iter.next().unwrap();
let peer = peers[peer_idx];
let counter = deps_counters_iter.next().unwrap() as Counter;
f.push(ID::new(peer, counter));
}
deps.push(f);
this_counter += lengths[i];
}
let mut counters = Vec::with_capacity(n_changes);
let mut last = first_counter;
for i in 0..n_changes {
counters.push(last);
last += lengths[i];
}
let lamport_decoder = DeltaOfDeltaDecoder::new(bytes).unwrap();
let (mut lamports, rest) = lamport_decoder
.take_n_finalize(n_changes.saturating_sub(1))
.unwrap();
// the last lamport
lamports.push((lamport_start + lamport_len - *lengths.last().unwrap_or(&0) as u32) as Lamport);
// we need counter range, so encode
counters.push(first_counter + counter_len);
debug_assert!(rest.is_empty());
ChangesBlockHeader {
peer: peers[0],
counter: first_counter,
n_changes,
peers,
counters,
deps_groups: deps,
lamports,
keys: OnceCell::new(),
cids: OnceCell::new(),
}
}
struct EncodedDeps {
peer_idx: AnyRleEncoder<u32>,
counter: DeltaOfDeltaEncoder,
}

View file

@ -1,6 +1,5 @@
use std::{
borrow::Cow,
collections::BTreeMap,
io::Write,
sync::{
atomic::{AtomicU64, AtomicU8, Ordering},

View file

@ -1,19 +1,14 @@
#[cfg(feature = "counter")]
use super::counter_state::CounterState;
use super::{ContainerCreationContext, MovableListState, State, TreeState};
use super::{ContainerCreationContext, State};
use crate::{
arena::SharedArena,
configure::Configure,
container::idx::ContainerIdx,
state::{FastStateSnapshot, RichtextState},
utils::kv_wrapper::KvWrapper,
version::Frontiers,
VersionVector,
};
use bytes::Bytes;
use fxhash::{FxHashMap, FxHashSet};
use inner_store::InnerStore;
use loro_common::{ContainerID, ContainerType, LoroResult, LoroValue};
use loro_common::{LoroResult, LoroValue};
use std::sync::{atomic::AtomicU64, Arc, Mutex};
pub(crate) use container_wrapper::ContainerWrapper;

View file

@ -1,14 +1,13 @@
use std::ops::Bound;
use bytes::Bytes;
use fxhash::{FxHashMap, FxHashSet};
use fxhash::FxHashMap;
use loro_common::ContainerID;
use tracing::trace;
use crate::{
arena::SharedArena,
container::idx::ContainerIdx,
state::{container_store::FRONTIERS_KEY, ContainerCreationContext},
state::container_store::FRONTIERS_KEY,
utils::kv_wrapper::KvWrapper,
version::Frontiers,
};

View file

@ -18,8 +18,7 @@ use crate::{
handler::ValueOrHandler,
op::{ListSlice, Op, RawOp},
state::movable_list_state::inner::PushElemInfo,
txn::Transaction,
ApplyDiff, DocState, ListDiff,
txn::Transaction, DocState, ListDiff,
};
use self::{

View file

@ -5,7 +5,7 @@ use std::{
};
use bytes::Bytes;
use loro_kv_store::{compress::CompressionType, mem_store::MemKvConfig, MemKvStore};
use loro_kv_store::{mem_store::MemKvConfig, MemKvStore};
use crate::kv_store::KvStore;

View file

@ -1,4 +1,4 @@
use std::{cmp::Ordering, sync::Arc, time::Instant};
use std::{cmp::Ordering, sync::Arc};
use loro_internal::{
change::{Change, Lamport, Timestamp},

View file

@ -12,7 +12,6 @@ use loro_internal::cursor::Side;
use loro_internal::encoding::ImportBlobMetadata;
use loro_internal::handler::HandlerTrait;
use loro_internal::handler::ValueOrHandler;
use loro_internal::json::JsonChange;
use loro_internal::obs::LocalUpdateCallback;
use loro_internal::undo::{OnPop, OnPush};
use loro_internal::version::ImVersionVector;

View file

@ -1,4 +1,3 @@
use dev_utils::setup_test_log;
use loro::LoroDoc;
mod gc_test;
@ -6,7 +5,7 @@ mod undo_test;
fn gen_action(doc: &LoroDoc, seed: u64, mut ops_len: usize) {
let mut rng = StdRng::seed_from_u64(seed);
use loro::{ContainerType, LoroValue};
use loro::LoroValue;
use rand::prelude::*;
let root_map = doc.get_map("root");