refactor: wrap block in Arc

This commit is contained in:
Zixuan Chen 2024-05-29 18:09:02 +08:00
parent fd6e945d4b
commit 8e0a4c6cc8
No known key found for this signature in database

View file

@ -15,7 +15,7 @@ const MAX_BLOCK_SIZE: usize = 1024 * 4;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ChangeStore { pub struct ChangeStore {
arena: SharedArena, arena: SharedArena,
kv: BTreeMap<ID, ChangesBlock>, kv: BTreeMap<ID, Arc<ChangesBlock>>,
} }
impl ChangeStore { impl ChangeStore {
@ -37,7 +37,8 @@ impl ChangeStore {
} }
} }
self.kv.insert(id, ChangesBlock::new(change, &self.arena)); self.kv
.insert(id, Arc::new(ChangesBlock::new(change, &self.arena)));
} }
pub fn insert_block(&mut self, block: ChangesBlock) { pub fn insert_block(&mut self, block: ChangesBlock) {
@ -48,10 +49,10 @@ impl ChangeStore {
self.kv.len() self.kv.len()
} }
pub(crate) fn iter_bytes(&mut self) -> impl Iterator<Item = (ID, &'_ ChangesBlockBytes)> + '_ { pub(crate) fn iter_bytes(&mut self) -> impl Iterator<Item = (ID, ChangesBlockBytes)> + '_ {
self.kv self.kv
.iter_mut() .iter_mut()
.map(|(id, block)| (*id, block.content.bytes(&self.arena))) .map(|(id, block)| (*id, block.bytes(&self.arena)))
} }
pub(crate) fn encode_all(&mut self) -> Vec<u8> { pub(crate) fn encode_all(&mut self) -> Vec<u8> {
@ -73,7 +74,7 @@ impl ChangeStore {
let size = leb128::read::unsigned(&mut reader).unwrap(); let size = leb128::read::unsigned(&mut reader).unwrap();
let block_bytes = &reader[0..size as usize]; let block_bytes = &reader[0..size as usize];
let block = ChangesBlock::from_bytes(Bytes::copy_from_slice(block_bytes), &self.arena)?; let block = ChangesBlock::from_bytes(Bytes::copy_from_slice(block_bytes), &self.arena)?;
self.kv.insert(block.id(), block); self.kv.insert(block.id(), Arc::new(block));
reader = &reader[size as usize..]; reader = &reader[size as usize..];
} }
@ -116,7 +117,7 @@ impl ChangesBlock {
let lamport_range = (change.lamport, change.lamport + atom_len as Lamport); let lamport_range = (change.lamport, change.lamport + atom_len as Lamport);
let estimated_size = change.estimate_storage_size(); let estimated_size = change.estimate_storage_size();
let peer = change.id.peer; let peer = change.id.peer;
let content = ChangesBlockContent::Changes(vec![change]); let content = ChangesBlockContent::Changes(Arc::new(vec![change]));
Self { Self {
arena: a.clone(), arena: a.clone(),
peer, peer,
@ -155,14 +156,20 @@ impl ChangesBlock {
self.estimated_size > MAX_BLOCK_SIZE self.estimated_size > MAX_BLOCK_SIZE
} }
pub fn push_change(&mut self, change: Change) -> Result<(), Change> { pub fn push_change(self: &mut Arc<Self>, change: Change) -> Result<(), Change> {
if self.counter_range.1 != change.id.counter {
return Err(change);
}
let atom_len = change.atom_len(); let atom_len = change.atom_len();
let next_lamport = change.lamport + atom_len as Lamport; let next_lamport = change.lamport + atom_len as Lamport;
let next_counter = change.id.counter + atom_len as Counter; let next_counter = change.id.counter + atom_len as Counter;
let is_full = self.is_full(); let is_full = self.is_full();
let changes = self.content.changes_mut().unwrap(); let this = Arc::make_mut(self);
let changes = this.content.changes_mut().unwrap();
let merge_interval = 10000; // TODO: FIXME: Use configure let merge_interval = 10000; // TODO: FIXME: Use configure
let changes = Arc::make_mut(changes);
match changes.last_mut() { match changes.last_mut() {
Some(last) Some(last)
if change.deps_on_self() if change.deps_on_self()
@ -174,7 +181,7 @@ impl ChangesBlock {
for op in change.ops.into_iter() { for op in change.ops.into_iter() {
let size = op.estimate_storage_size(); let size = op.estimate_storage_size();
if !last.ops.push(op) { if !last.ops.push(op) {
self.estimated_size += size; this.estimated_size += size;
} }
} }
} }
@ -182,17 +189,31 @@ impl ChangesBlock {
if is_full { if is_full {
return Err(change); return Err(change);
} else { } else {
self.estimated_size += change.estimate_storage_size(); this.estimated_size += change.estimate_storage_size();
changes.push(change); changes.push(change);
} }
} }
} }
self.counter_range.1 = next_counter; this.counter_range.1 = next_counter;
self.lamport_range.1 = next_lamport; this.lamport_range.1 = next_lamport;
Ok(()) Ok(())
} }
pub fn bytes<'a>(self: &'a mut Arc<Self>, a: &SharedArena) -> ChangesBlockBytes {
match &self.content {
ChangesBlockContent::Bytes(bytes) => bytes.clone(),
ChangesBlockContent::Both(_, bytes) => bytes.clone(),
ChangesBlockContent::Changes(changes) => {
let bytes = ChangesBlockBytes::serialize(changes, a);
let c = Arc::clone(&changes);
let this = Arc::make_mut(self);
this.content = ChangesBlockContent::Both(c, bytes.clone());
bytes
}
}
}
fn id(&self) -> ID { fn id(&self) -> ID {
ID::new(self.peer, self.counter_range.0) ID::new(self.peer, self.counter_range.0)
} }
@ -200,9 +221,9 @@ impl ChangesBlock {
#[derive(Clone)] #[derive(Clone)]
enum ChangesBlockContent { enum ChangesBlockContent {
Changes(Vec<Change>), Changes(Arc<Vec<Change>>),
Bytes(ChangesBlockBytes), Bytes(ChangesBlockBytes),
Both(Vec<Change>, ChangesBlockBytes), Both(Arc<Vec<Change>>, ChangesBlockBytes),
} }
impl ChangesBlockContent { impl ChangesBlockContent {
@ -212,26 +233,14 @@ impl ChangesBlockContent {
ChangesBlockContent::Both(changes, _) => Ok(changes), ChangesBlockContent::Both(changes, _) => Ok(changes),
ChangesBlockContent::Bytes(bytes) => { ChangesBlockContent::Bytes(bytes) => {
let changes = bytes.parse(&SharedArena::new())?; let changes = bytes.parse(&SharedArena::new())?;
*self = ChangesBlockContent::Both(changes, bytes.clone()); *self = ChangesBlockContent::Both(Arc::new(changes), bytes.clone());
self.changes() self.changes()
} }
} }
} }
pub fn bytes(&mut self, a: &SharedArena) -> &ChangesBlockBytes {
match self {
ChangesBlockContent::Bytes(bytes) => bytes,
ChangesBlockContent::Both(_, bytes) => bytes,
ChangesBlockContent::Changes(changes) => {
let bytes = ChangesBlockBytes::serialize(changes, a);
*self = ChangesBlockContent::Both(std::mem::take(changes), bytes);
self.bytes(a)
}
}
}
/// Note that this method will invalidate the stored bytes /// Note that this method will invalidate the stored bytes
fn changes_mut(&mut self) -> LoroResult<&mut Vec<Change>> { fn changes_mut(&mut self) -> LoroResult<&mut Arc<Vec<Change>>> {
match self { match self {
ChangesBlockContent::Changes(changes) => Ok(changes), ChangesBlockContent::Changes(changes) => Ok(changes),
ChangesBlockContent::Both(changes, _) => { ChangesBlockContent::Both(changes, _) => {
@ -240,7 +249,7 @@ impl ChangesBlockContent {
} }
ChangesBlockContent::Bytes(bytes) => { ChangesBlockContent::Bytes(bytes) => {
let changes = bytes.parse(&SharedArena::new())?; let changes = bytes.parse(&SharedArena::new())?;
*self = ChangesBlockContent::Changes(changes); *self = ChangesBlockContent::Changes(Arc::new(changes));
self.changes_mut() self.changes_mut()
} }
} }
@ -265,10 +274,11 @@ impl std::fmt::Debug for ChangesBlockContent {
} }
} }
/// It's cheap to clone this struct because it's cheap to clone the bytes
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct ChangesBlockBytes { pub(crate) struct ChangesBlockBytes {
bytes: Bytes, bytes: Bytes,
header: Option<ChangesBlockHeader>, header: Option<Arc<ChangesBlockHeader>>,
} }
impl ChangesBlockBytes { impl ChangesBlockBytes {
@ -281,14 +291,14 @@ impl ChangesBlockBytes {
fn ensure_header(&mut self) -> LoroResult<()> { fn ensure_header(&mut self) -> LoroResult<()> {
if self.header.is_none() { if self.header.is_none() {
self.header = Some(decode_header(&self.bytes)?); self.header = Some(Arc::new(decode_header(&self.bytes)?));
} }
Ok(()) Ok(())
} }
fn parse(&mut self, a: &SharedArena) -> LoroResult<Vec<Change>> { fn parse(&mut self, a: &SharedArena) -> LoroResult<Vec<Change>> {
self.ensure_header()?; self.ensure_header()?;
decode_block(&self.bytes, a, self.header.as_ref()) decode_block(&self.bytes, a, self.header.as_ref().map(|h| h.as_ref()))
} }
fn serialize(changes: &[Change], a: &SharedArena) -> Self { fn serialize(changes: &[Change], a: &SharedArena) -> Self {