diff --git a/crates/loro-internal/src/container.rs b/crates/loro-internal/src/container.rs index 999b467c..774eadf4 100644 --- a/crates/loro-internal/src/container.rs +++ b/crates/loro-internal/src/container.rs @@ -31,7 +31,7 @@ pub mod map; mod pool; pub mod text; -pub use registry::ContainerIdx; +use registry::ContainerIdx; // Note: It will be encoded into binary format, so the order of its fields should not be changed. #[cfg_attr(feature = "test_utils", derive(arbitrary::Arbitrary))] #[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)] diff --git a/crates/loro-internal/src/container/registry.rs b/crates/loro-internal/src/container/registry.rs index ea83def1..4ec6a654 100644 --- a/crates/loro-internal/src/container/registry.rs +++ b/crates/loro-internal/src/container/registry.rs @@ -28,6 +28,12 @@ use super::{ ContainerID, ContainerTrait, ContainerType, }; +/// Inner representation for ContainerID. +/// +/// It's only used inside this crate and should not be exposed to the user. +/// +/// TODO: make this type pub(crate) +/// /// During a transaction, we may create some containers which are deleted later. And these containers also need a unique ContainerIdx. /// So when we encode snapshot, we need to sort the containers by ContainerIdx and change the `container` of ops to the index of containers. /// An empty store decodes the snapshot, it will create these containers in a sequence of natural numbers so that containers and ops can correspond one-to-one diff --git a/crates/loro-internal/src/error.rs b/crates/loro-internal/src/error.rs index fb30489e..3c5f78f5 100644 --- a/crates/loro-internal/src/error.rs +++ b/crates/loro-internal/src/error.rs @@ -1,6 +1,6 @@ use thiserror::Error; -use crate::id::PeerID; +use crate::id::{PeerID, ID}; #[derive(Error, Debug)] pub enum LoroError { @@ -26,6 +26,8 @@ pub enum LoroError { TempContainerError, #[error("Index out of bound. The given pos is {pos}, but the length is {len}")] OutOfBound { pos: usize, len: usize }, + #[error("Every op id should be unique. ID {id} has been used. You should use a new PeerID to edit the content. ")] + UsedOpID { id: ID }, // #[error("the data for key `{0}` is not available")] // Redaction(String), // #[error("invalid header (expected {expected:?}, found {found:?})")] diff --git a/crates/loro-internal/src/refactor/arena.rs b/crates/loro-internal/src/refactor/arena.rs index 105a07b3..559d58b4 100644 --- a/crates/loro-internal/src/refactor/arena.rs +++ b/crates/loro-internal/src/refactor/arena.rs @@ -1,11 +1,63 @@ -use crate::container::ContainerID; +use append_only_bytes::{AppendOnlyBytes, BytesSlice}; +use im::Vector; + +use crate::{ + container::{registry::ContainerIdx, ContainerID}, + LoroValue, +}; /// This is shared between [OpLog] and [AppState]. /// It uses a immutable data structure inside so that we have O(1) clone time. /// It can make sharing data between threads easier. /// -#[derive(Clone)] +#[derive(Clone, Default)] pub(super) struct SharedArena { + container_idx_to_id: Vector, + container_id_to_idx: im::HashMap, /// The parent of each container. - parents: im::HashMap, + parents: im::HashMap>, + bytes: AppendOnlyBytes, + values: Vector, +} + +impl SharedArena { + pub fn register_container(&mut self, id: &ContainerID) -> ContainerIdx { + if let Some(&idx) = self.container_id_to_idx.get(id) { + return idx; + } + + let idx = self.container_idx_to_id.len(); + self.container_idx_to_id.push_back(id.clone()); + let ans = ContainerIdx::from_u32(idx as u32); + self.container_id_to_idx.insert(id.clone(), ans); + ans + } + + pub fn id_to_idx(&self, id: &ContainerID) -> Option { + self.container_id_to_idx.get(id).copied() + } + + pub fn idx_to_id(&self, id: ContainerIdx) -> Option<&ContainerID> { + self.container_idx_to_id.get(id.to_u32() as usize) + } + + pub fn alloc_bytes(&mut self, bytes: &[u8]) -> BytesSlice { + let start = self.bytes.len(); + self.bytes.push_slice(bytes); + self.bytes.slice(start..self.bytes.len()) + } + + pub fn alloc_value(&mut self, value: LoroValue) -> usize { + self.values.push_back(value); + self.values.len() - 1 + } + + pub fn alloc_values(&mut self, values: impl Iterator) -> (usize, usize) { + let start = self.values.len(); + for value in values { + self.values.push_back(value); + } + + (start, self.values.len()) + } } diff --git a/crates/loro-internal/src/refactor/diff_calc.rs b/crates/loro-internal/src/refactor/diff_calc.rs index 2c2a5e6b..48cfd9e7 100644 --- a/crates/loro-internal/src/refactor/diff_calc.rs +++ b/crates/loro-internal/src/refactor/diff_calc.rs @@ -5,7 +5,7 @@ use crate::container::ContainerID; /// Calculate the diff between two versions. given [OpLog][super::oplog::OpLog] /// and [AppState][super::state::AppState]. #[derive(Default)] -pub(super) struct DiffCalculator { +pub struct DiffCalculator { calc: FxHashMap, } impl DiffCalculator { diff --git a/crates/loro-internal/src/refactor/oplog.rs b/crates/loro-internal/src/refactor/oplog.rs index d0a0f3c8..6feb7159 100644 --- a/crates/loro-internal/src/refactor/oplog.rs +++ b/crates/loro-internal/src/refactor/oplog.rs @@ -1,22 +1,22 @@ mod dag; -use std::mem::take; -use std::sync::atomic::{self, AtomicBool}; -use std::sync::Mutex; - use fxhash::FxHashMap; use rle::{HasIndex, HasLength, Mergable, RleVec, Sliceable}; use smallvec::SmallVec; -use crate::change::{Lamport, Timestamp}; +use crate::change::{Change, Lamport, Timestamp}; +use crate::container::list::list_op::{InnerListOp, ListOp}; +use crate::container::map::InnerMapSet; use crate::dag::{Dag, DagNode}; -use crate::event::Diff; use crate::id::{Counter, PeerID, ID}; use crate::log_store::ClientChanges; +use crate::op::{Op, RemoteOp}; use crate::span::{HasId, HasLamport}; +use crate::text::text_content::SliceRange; use crate::version::{Frontiers, ImVersionVector, VersionVector}; +use crate::LoroError; -use super::diff_calc::DiffCalculator; +use super::arena::SharedArena; /// [OpLog] store all the ops i.e. the history. /// It allows multiple [AppState] to attach to it. @@ -25,11 +25,14 @@ use super::diff_calc::DiffCalculator; /// pub struct OpLog { pub(crate) dag: AppDag, + arena: SharedArena, pub(crate) changes: ClientChanges, pub(crate) latest_lamport: Lamport, pub(crate) latest_timestamp: Timestamp, - cache_diff: AtomicBool, - diff_calculator: Mutex>, + /// Pending changes that haven't been applied to the dag. + /// A change can be imported only when all its deps are already imported. + /// Key is the ID of the missing dep + pending_changes: FxHashMap>, } /// [AppDag] maintains the causal graph of the app. @@ -55,11 +58,11 @@ impl Clone for OpLog { fn clone(&self) -> Self { Self { dag: self.dag.clone(), + arena: Default::default(), changes: self.changes.clone(), latest_lamport: self.latest_lamport, latest_timestamp: self.latest_timestamp, - cache_diff: AtomicBool::new(false), - diff_calculator: Mutex::new(None), + pending_changes: Default::default(), } } } @@ -71,34 +74,135 @@ impl std::fmt::Debug for OpLog { .field("changes", &self.changes) .field("latest_lamport", &self.latest_lamport) .field("latest_timestamp", &self.latest_timestamp) - .field("cache_diff", &self.cache_diff) .finish() } } impl OpLog { - pub fn diff(&self, before: &VersionVector, after: &VersionVector) -> Vec { - let diff = take(&mut *self.diff_calculator.lock().unwrap()).unwrap_or_default(); - let ans = diff.calc(self, before, after); - if self.cache_diff.load(atomic::Ordering::Relaxed) { - self.diff_calculator.lock().unwrap().replace(diff); - } - - ans - } - - pub fn toggle_fast_diff_mode(&self, on: bool) { - self.cache_diff.store(on, atomic::Ordering::Relaxed) - } - pub fn new() -> Self { Self { dag: AppDag::default(), + arena: Default::default(), changes: ClientChanges::default(), latest_lamport: Lamport::default(), latest_timestamp: Timestamp::default(), - cache_diff: AtomicBool::new(false), - diff_calculator: Mutex::new(None), + pending_changes: Default::default(), } } + + /// Import a change. + /// + /// Pending changes that haven't been applied to the dag. + /// A change can be imported only when all its deps are already imported. + /// Key is the ID of the missing dep + /// + /// # Err + /// + /// Return Err(LoroError::UsedOpID) when the change's id is occupied + pub fn import_change(&mut self, change: Change) -> Result<(), LoroError> { + self.check_id_valid(change.id)?; + let change = self.convert_change(change); + if let Err(id) = self.check_deps(&change.deps) { + self.pending_changes.entry(id).or_default().push(change); + return Ok(()); + } + + self.changes.entry(change.id.peer).or_default().push(change); + Ok(()) + } + + fn check_id_valid(&self, id: ID) -> Result<(), LoroError> { + let cur_end = self.dag.vv.get(&id.peer).cloned().unwrap_or(0); + if cur_end > id.counter { + return Err(LoroError::UsedOpID { id }); + } + + Ok(()) + } + + fn check_deps(&self, deps: &Frontiers) -> Result<(), ID> { + for dep in deps.iter() { + if !self.dag.vv.includes_id(*dep) { + return Err(*dep); + } + } + + Ok(()) + } + + fn convert_change(&mut self, change: Change) -> Change { + Change { + id: change.id, + deps: change.deps, + lamport: change.lamport, + timestamp: change.timestamp, + ops: change + .ops + .into_iter() + .flat_map(|op| self.convert_op(op)) + .collect(), + } + } + + fn convert_op<'a, 'b>(&'a mut self, op: RemoteOp<'b>) -> SmallVec<[Op; 3]> { + let container = self.arena.register_container(&op.container); + let counter = op.counter; + op.contents + .into_iter() + .map(move |content| match content { + crate::op::RemoteContent::Map(map) => { + let value = self.arena.alloc_value(map.value) as u32; + Op { + counter, + container, + content: crate::op::InnerContent::Map(InnerMapSet { + key: map.key, + value, + }), + } + } + crate::op::RemoteContent::List(list) => match list { + ListOp::Insert { slice, pos } => match slice { + crate::text::text_content::ListSlice::RawData(values) => { + let (from, to) = self.arena.alloc_values(values.iter().cloned()); + Op { + counter, + container, + content: crate::op::InnerContent::List(InnerListOp::Insert { + slice: SliceRange::from(from as u32..to as u32), + pos, + }), + } + } + crate::text::text_content::ListSlice::RawStr(str) => { + let bytes = self.arena.alloc_bytes(str.as_bytes()); + Op { + counter, + container, + content: crate::op::InnerContent::List(InnerListOp::Insert { + slice: SliceRange::from( + bytes.start() as u32..bytes.end() as u32, + ), + pos, + }), + } + } + crate::text::text_content::ListSlice::Unknown(u) => Op { + counter, + container, + content: crate::op::InnerContent::List(InnerListOp::Insert { + slice: SliceRange::new_unknown(u as u32), + pos, + }), + }, + }, + ListOp::Delete(span) => Op { + counter, + container, + content: crate::op::InnerContent::List(InnerListOp::Delete(span)), + }, + }, + }) + .collect() + } }