fix: split large change when importing

This commit is contained in:
Zixuan Chen 2024-08-09 17:27:07 +08:00
parent 42329a20ea
commit bdc8b4b908
No known key found for this signature in database
10 changed files with 92 additions and 29 deletions

4
Cargo.lock generated
View file

@ -1874,9 +1874,9 @@ dependencies = [
[[package]]
name = "serde_columnar"
version = "0.3.6"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7947238638e841d935a1dadff54b74575ae60d51b977be75dab16e7638ea6e7d"
checksum = "3748cbf2b43a15ee9627881cabd7820d50508781cdebd3bb54cea49215d367a1"
dependencies = [
"itertools 0.11.0",
"postcard",

View file

@ -18,10 +18,8 @@ resolver = "2"
enum_dispatch = "0.3.11"
enum-as-inner = "0.5.1"
fxhash = "0.2.1"
tracing = { version = "0.1", features = [
"release_max_level_warn",
] }
serde_columnar = { version = "0.3.5" }
tracing = { version = "0.1", features = ["release_max_level_warn"] }
serde_columnar = { version = "0.3.7" }
serde_json = "1.0"
smallvec = { version = "1.8.0", features = ["serde"] }
itertools = "0.12.1"

View file

@ -56,7 +56,7 @@ pub fn main() {
);
// Checkout after compact change store
doc.checkout(&ID::new(doc.peer_id(), 100).into()).unwrap();
doc.check_state_correctness_slow();
let after_checkout = get_mem_usage();
println!("Allocated bytes after checkout: {}", after_checkout);
}

View file

@ -10,11 +10,6 @@ pub struct MapSet {
// the key is deleted if value is None
pub(crate) value: Option<LoroValue>,
}
impl MapSet {
pub(crate) fn estimate_storage_size(&self) -> usize {
self.key.len() + 3
}
}
impl Mergable for MapSet {}
impl Sliceable for MapSet {

View file

@ -53,11 +53,6 @@ impl TreeOp {
TreeOp::Delete { .. } => None,
}
}
pub(crate) fn estimate_storage_size(&self) -> usize {
// TODO: use a better value
20
}
}
impl HasLength for TreeOp {

View file

@ -29,12 +29,9 @@ pub struct Op {
impl EstimatedSize for Op {
fn estimate_storage_size(&self) -> usize {
let counter_size = 4;
let container_size = 2;
let content_size = self
.content
.estimate_storage_size(self.container.get_type());
counter_size + container_size + content_size
// TODO: use benchmark to get the optimal estimated size for each container type
self.content
.estimate_storage_size(self.container.get_type())
}
}

View file

@ -68,8 +68,8 @@ impl InnerContent {
pub fn estimate_storage_size(&self, kind: ContainerType) -> usize {
match self {
InnerContent::List(l) => l.estimate_storage_size(kind),
InnerContent::Map(m) => m.estimate_storage_size(),
InnerContent::Tree(t) => t.estimate_storage_size(),
InnerContent::Map(_) => 3,
InnerContent::Tree(_) => 8,
InnerContent::Future(f) => f.estimate_storage_size(),
}
}

View file

@ -257,8 +257,8 @@ impl OpLog {
.lock()
.unwrap()
.insert_by_new_change(&change, true, true);
self.change_store.insert_change(change.clone());
self.register_container_and_parent_link(&change);
self.change_store.insert_change(change);
}
#[inline(always)]

View file

@ -6,7 +6,7 @@ use loro_common::{
PeerID, ID,
};
use once_cell::sync::OnceCell;
use rle::{HasLength, Mergable};
use rle::{HasLength, Mergable, RleVec, Sliceable};
use std::{
cmp::Ordering,
collections::{BTreeMap, VecDeque},
@ -72,6 +72,12 @@ impl ChangeStore {
}
pub fn insert_change(&mut self, mut change: Change) {
let estimated_size = change.estimate_storage_size();
if estimated_size > MAX_BLOCK_SIZE {
self.split_change_then_insert(change);
return;
}
let id = change.id;
let mut kv = self.mem_parsed_kv.lock().unwrap();
if let Some(old) = self.vv.get_mut(&change.id.peer) {
@ -85,6 +91,11 @@ impl ChangeStore {
);
}
} else {
// TODO: make this work and remove the assert
assert_eq!(
change.id.counter, 0,
"inserting a new change with counter != 0 and not in vv"
);
self.vv.insert(
change.id.peer,
change.id.counter + change.atom_len() as Counter,
@ -99,6 +110,7 @@ impl ChangeStore {
match block.push_change(
change,
estimated_size,
self.merge_interval
.load(std::sync::atomic::Ordering::Acquire),
) {
@ -113,6 +125,48 @@ impl ChangeStore {
kv.insert(id, Arc::new(ChangesBlock::new(change, &self.arena)));
}
fn split_change_then_insert(&mut self, change: Change) {
let original_len = change.atom_len();
let mut new_change = Change {
ops: RleVec::new(),
deps: change.deps,
id: change.id,
lamport: change.lamport,
timestamp: change.timestamp,
};
let mut total_len = 0;
let mut estimated_size = 16;
for op in change.ops.into_iter() {
estimated_size += op.estimate_storage_size();
if estimated_size > MAX_BLOCK_SIZE && !new_change.ops.is_empty() {
let ctr_end = new_change.id.counter + new_change.atom_len() as Counter;
let next_lamport = new_change.lamport + new_change.atom_len() as Lamport;
total_len += new_change.atom_len();
self.insert_change(new_change);
new_change = Change {
ops: RleVec::new(),
deps: ID::new(change.id.peer, ctr_end - 1).into(),
id: ID::new(change.id.peer, ctr_end),
lamport: next_lamport,
timestamp: change.timestamp,
};
estimated_size = 16;
new_change.ops.push(op);
} else {
new_change.ops.push(op);
}
}
if !new_change.ops.is_empty() {
total_len += new_change.atom_len();
self.insert_change(new_change);
}
debug_assert_eq!(total_len, original_len);
}
/// Flush the cached change to kv_store
pub(crate) fn flush_and_compact(&mut self) {
let mut mem = self.mem_parsed_kv.lock().unwrap();
@ -507,6 +561,7 @@ impl ChangesBlock {
pub fn push_change(
self: &mut Arc<Self>,
change: Change,
new_change_size: usize,
merge_interval: i64,
) -> Result<(), Change> {
if self.counter_range.1 != change.id.counter {
@ -517,7 +572,7 @@ impl ChangesBlock {
let next_lamport = change.lamport + atom_len as Lamport;
let next_counter = change.id.counter + atom_len as Counter;
let is_full = self.is_full();
let is_full = new_change_size + self.estimated_size > MAX_BLOCK_SIZE;
let this = Arc::make_mut(self);
let changes = this.content.changes_mut().unwrap();
let changes = Arc::make_mut(changes);
@ -540,7 +595,7 @@ impl ChangesBlock {
if is_full {
return Err(change);
} else {
this.estimated_size += change.estimate_storage_size();
this.estimated_size += new_change_size;
changes.push(change);
}
}

View file

@ -0,0 +1,23 @@
use loro::{LoroDoc, LoroMap, ID};
#[test]
fn test_compact_change_store() {
let doc = LoroDoc::new();
doc.set_peer_id(0).unwrap();
let text = doc.get_text("text");
for i in 0..100 {
text.insert(i, "hello").unwrap();
}
let list = doc.get_list("list");
for _ in 0..100 {
let map = list.push_container(LoroMap::new()).unwrap();
for j in 0..100 {
map.insert(&j.to_string(), j).unwrap();
}
}
doc.commit();
doc.compact_change_store();
doc.checkout(&ID::new(0, 60).into()).unwrap();
}