diff --git a/crates/loro-internal/src/oplog/change_store.rs b/crates/loro-internal/src/oplog/change_store.rs index 50276797..94bd7feb 100644 --- a/crates/loro-internal/src/oplog/change_store.rs +++ b/crates/loro-internal/src/oplog/change_store.rs @@ -15,7 +15,7 @@ const MAX_BLOCK_SIZE: usize = 1024 * 4; #[derive(Debug, Clone)] pub struct ChangeStore { arena: SharedArena, - kv: BTreeMap, + kv: BTreeMap>, } impl ChangeStore { @@ -37,7 +37,8 @@ impl ChangeStore { } } - self.kv.insert(id, ChangesBlock::new(change, &self.arena)); + self.kv + .insert(id, Arc::new(ChangesBlock::new(change, &self.arena))); } pub fn insert_block(&mut self, block: ChangesBlock) { @@ -48,10 +49,10 @@ impl ChangeStore { self.kv.len() } - pub(crate) fn iter_bytes(&mut self) -> impl Iterator + '_ { + pub(crate) fn iter_bytes(&mut self) -> impl Iterator + '_ { self.kv .iter_mut() - .map(|(id, block)| (*id, block.content.bytes(&self.arena))) + .map(|(id, block)| (*id, block.bytes(&self.arena))) } pub(crate) fn encode_all(&mut self) -> Vec { @@ -73,7 +74,7 @@ impl ChangeStore { let size = leb128::read::unsigned(&mut reader).unwrap(); let block_bytes = &reader[0..size as usize]; let block = ChangesBlock::from_bytes(Bytes::copy_from_slice(block_bytes), &self.arena)?; - self.kv.insert(block.id(), block); + self.kv.insert(block.id(), Arc::new(block)); reader = &reader[size as usize..]; } @@ -116,7 +117,7 @@ impl ChangesBlock { let lamport_range = (change.lamport, change.lamport + atom_len as Lamport); let estimated_size = change.estimate_storage_size(); let peer = change.id.peer; - let content = ChangesBlockContent::Changes(vec![change]); + let content = ChangesBlockContent::Changes(Arc::new(vec![change])); Self { arena: a.clone(), peer, @@ -155,14 +156,20 @@ impl ChangesBlock { self.estimated_size > MAX_BLOCK_SIZE } - pub fn push_change(&mut self, change: Change) -> Result<(), Change> { + pub fn push_change(self: &mut Arc, change: Change) -> Result<(), Change> { + if self.counter_range.1 != change.id.counter { + return Err(change); + } + let atom_len = change.atom_len(); let next_lamport = change.lamport + atom_len as Lamport; let next_counter = change.id.counter + atom_len as Counter; let is_full = self.is_full(); - let changes = self.content.changes_mut().unwrap(); + let this = Arc::make_mut(self); + let changes = this.content.changes_mut().unwrap(); let merge_interval = 10000; // TODO: FIXME: Use configure + let changes = Arc::make_mut(changes); match changes.last_mut() { Some(last) if change.deps_on_self() @@ -174,7 +181,7 @@ impl ChangesBlock { for op in change.ops.into_iter() { let size = op.estimate_storage_size(); if !last.ops.push(op) { - self.estimated_size += size; + this.estimated_size += size; } } } @@ -182,17 +189,31 @@ impl ChangesBlock { if is_full { return Err(change); } else { - self.estimated_size += change.estimate_storage_size(); + this.estimated_size += change.estimate_storage_size(); changes.push(change); } } } - self.counter_range.1 = next_counter; - self.lamport_range.1 = next_lamport; + this.counter_range.1 = next_counter; + this.lamport_range.1 = next_lamport; Ok(()) } + pub fn bytes<'a>(self: &'a mut Arc, a: &SharedArena) -> ChangesBlockBytes { + match &self.content { + ChangesBlockContent::Bytes(bytes) => bytes.clone(), + ChangesBlockContent::Both(_, bytes) => bytes.clone(), + ChangesBlockContent::Changes(changes) => { + let bytes = ChangesBlockBytes::serialize(changes, a); + let c = Arc::clone(&changes); + let this = Arc::make_mut(self); + this.content = ChangesBlockContent::Both(c, bytes.clone()); + bytes + } + } + } + fn id(&self) -> ID { ID::new(self.peer, self.counter_range.0) } @@ -200,9 +221,9 @@ impl ChangesBlock { #[derive(Clone)] enum ChangesBlockContent { - Changes(Vec), + Changes(Arc>), Bytes(ChangesBlockBytes), - Both(Vec, ChangesBlockBytes), + Both(Arc>, ChangesBlockBytes), } impl ChangesBlockContent { @@ -212,26 +233,14 @@ impl ChangesBlockContent { ChangesBlockContent::Both(changes, _) => Ok(changes), ChangesBlockContent::Bytes(bytes) => { let changes = bytes.parse(&SharedArena::new())?; - *self = ChangesBlockContent::Both(changes, bytes.clone()); + *self = ChangesBlockContent::Both(Arc::new(changes), bytes.clone()); self.changes() } } } - pub fn bytes(&mut self, a: &SharedArena) -> &ChangesBlockBytes { - match self { - ChangesBlockContent::Bytes(bytes) => bytes, - ChangesBlockContent::Both(_, bytes) => bytes, - ChangesBlockContent::Changes(changes) => { - let bytes = ChangesBlockBytes::serialize(changes, a); - *self = ChangesBlockContent::Both(std::mem::take(changes), bytes); - self.bytes(a) - } - } - } - /// Note that this method will invalidate the stored bytes - fn changes_mut(&mut self) -> LoroResult<&mut Vec> { + fn changes_mut(&mut self) -> LoroResult<&mut Arc>> { match self { ChangesBlockContent::Changes(changes) => Ok(changes), ChangesBlockContent::Both(changes, _) => { @@ -240,7 +249,7 @@ impl ChangesBlockContent { } ChangesBlockContent::Bytes(bytes) => { let changes = bytes.parse(&SharedArena::new())?; - *self = ChangesBlockContent::Changes(changes); + *self = ChangesBlockContent::Changes(Arc::new(changes)); self.changes_mut() } } @@ -265,10 +274,11 @@ impl std::fmt::Debug for ChangesBlockContent { } } +/// It's cheap to clone this struct because it's cheap to clone the bytes #[derive(Clone)] pub(crate) struct ChangesBlockBytes { bytes: Bytes, - header: Option, + header: Option>, } impl ChangesBlockBytes { @@ -281,14 +291,14 @@ impl ChangesBlockBytes { fn ensure_header(&mut self) -> LoroResult<()> { if self.header.is_none() { - self.header = Some(decode_header(&self.bytes)?); + self.header = Some(Arc::new(decode_header(&self.bytes)?)); } Ok(()) } fn parse(&mut self, a: &SharedArena) -> LoroResult> { self.ensure_header()?; - decode_block(&self.bytes, a, self.header.as_ref()) + decode_block(&self.bytes, a, self.header.as_ref().map(|h| h.as_ref())) } fn serialize(changes: &[Change], a: &SharedArena) -> Self {