feat: convert remote change to local change in oplog

This commit is contained in:
Zixuan Chen 2023-07-05 22:07:45 +08:00
parent a0a7dc1080
commit 8f6a6e1cc2
6 changed files with 198 additions and 34 deletions

View file

@ -31,7 +31,7 @@ pub mod map;
mod pool;
pub mod text;
pub use registry::ContainerIdx;
use registry::ContainerIdx;
// Note: It will be encoded into binary format, so the order of its fields should not be changed.
#[cfg_attr(feature = "test_utils", derive(arbitrary::Arbitrary))]
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]

View file

@ -28,6 +28,12 @@ use super::{
ContainerID, ContainerTrait, ContainerType,
};
/// Inner representation for ContainerID.
///
/// It's only used inside this crate and should not be exposed to the user.
///
/// TODO: make this type pub(crate)
///
/// During a transaction, we may create some containers which are deleted later. And these containers also need a unique ContainerIdx.
/// So when we encode snapshot, we need to sort the containers by ContainerIdx and change the `container` of ops to the index of containers.
/// An empty store decodes the snapshot, it will create these containers in a sequence of natural numbers so that containers and ops can correspond one-to-one

View file

@ -1,6 +1,6 @@
use thiserror::Error;
use crate::id::PeerID;
use crate::id::{PeerID, ID};
#[derive(Error, Debug)]
pub enum LoroError {
@ -26,6 +26,8 @@ pub enum LoroError {
TempContainerError,
#[error("Index out of bound. The given pos is {pos}, but the length is {len}")]
OutOfBound { pos: usize, len: usize },
#[error("Every op id should be unique. ID {id} has been used. You should use a new PeerID to edit the content. ")]
UsedOpID { id: ID },
// #[error("the data for key `{0}` is not available")]
// Redaction(String),
// #[error("invalid header (expected {expected:?}, found {found:?})")]

View file

@ -1,11 +1,63 @@
use crate::container::ContainerID;
use append_only_bytes::{AppendOnlyBytes, BytesSlice};
use im::Vector;
use crate::{
container::{registry::ContainerIdx, ContainerID},
LoroValue,
};
/// This is shared between [OpLog] and [AppState].
/// It uses a immutable data structure inside so that we have O(1) clone time.
/// It can make sharing data between threads easier.
///
#[derive(Clone)]
#[derive(Clone, Default)]
pub(super) struct SharedArena {
container_idx_to_id: Vector<ContainerID>,
container_id_to_idx: im::HashMap<ContainerID, ContainerIdx>,
/// The parent of each container.
parents: im::HashMap<ContainerID, ContainerID>,
parents: im::HashMap<ContainerIdx, Option<ContainerIdx>>,
bytes: AppendOnlyBytes,
values: Vector<LoroValue>,
}
impl SharedArena {
pub fn register_container(&mut self, id: &ContainerID) -> ContainerIdx {
if let Some(&idx) = self.container_id_to_idx.get(id) {
return idx;
}
let idx = self.container_idx_to_id.len();
self.container_idx_to_id.push_back(id.clone());
let ans = ContainerIdx::from_u32(idx as u32);
self.container_id_to_idx.insert(id.clone(), ans);
ans
}
pub fn id_to_idx(&self, id: &ContainerID) -> Option<ContainerIdx> {
self.container_id_to_idx.get(id).copied()
}
pub fn idx_to_id(&self, id: ContainerIdx) -> Option<&ContainerID> {
self.container_idx_to_id.get(id.to_u32() as usize)
}
pub fn alloc_bytes(&mut self, bytes: &[u8]) -> BytesSlice {
let start = self.bytes.len();
self.bytes.push_slice(bytes);
self.bytes.slice(start..self.bytes.len())
}
pub fn alloc_value(&mut self, value: LoroValue) -> usize {
self.values.push_back(value);
self.values.len() - 1
}
pub fn alloc_values(&mut self, values: impl Iterator<Item = LoroValue>) -> (usize, usize) {
let start = self.values.len();
for value in values {
self.values.push_back(value);
}
(start, self.values.len())
}
}

View file

@ -5,7 +5,7 @@ use crate::container::ContainerID;
/// Calculate the diff between two versions. given [OpLog][super::oplog::OpLog]
/// and [AppState][super::state::AppState].
#[derive(Default)]
pub(super) struct DiffCalculator {
pub struct DiffCalculator {
calc: FxHashMap<ContainerID, ContainerDiffCalculator>,
}
impl DiffCalculator {

View file

@ -1,22 +1,22 @@
mod dag;
use std::mem::take;
use std::sync::atomic::{self, AtomicBool};
use std::sync::Mutex;
use fxhash::FxHashMap;
use rle::{HasIndex, HasLength, Mergable, RleVec, Sliceable};
use smallvec::SmallVec;
use crate::change::{Lamport, Timestamp};
use crate::change::{Change, Lamport, Timestamp};
use crate::container::list::list_op::{InnerListOp, ListOp};
use crate::container::map::InnerMapSet;
use crate::dag::{Dag, DagNode};
use crate::event::Diff;
use crate::id::{Counter, PeerID, ID};
use crate::log_store::ClientChanges;
use crate::op::{Op, RemoteOp};
use crate::span::{HasId, HasLamport};
use crate::text::text_content::SliceRange;
use crate::version::{Frontiers, ImVersionVector, VersionVector};
use crate::LoroError;
use super::diff_calc::DiffCalculator;
use super::arena::SharedArena;
/// [OpLog] store all the ops i.e. the history.
/// It allows multiple [AppState] to attach to it.
@ -25,11 +25,14 @@ use super::diff_calc::DiffCalculator;
///
pub struct OpLog {
pub(crate) dag: AppDag,
arena: SharedArena,
pub(crate) changes: ClientChanges,
pub(crate) latest_lamport: Lamport,
pub(crate) latest_timestamp: Timestamp,
cache_diff: AtomicBool,
diff_calculator: Mutex<Option<DiffCalculator>>,
/// Pending changes that haven't been applied to the dag.
/// A change can be imported only when all its deps are already imported.
/// Key is the ID of the missing dep
pending_changes: FxHashMap<ID, Vec<Change>>,
}
/// [AppDag] maintains the causal graph of the app.
@ -55,11 +58,11 @@ impl Clone for OpLog {
fn clone(&self) -> Self {
Self {
dag: self.dag.clone(),
arena: Default::default(),
changes: self.changes.clone(),
latest_lamport: self.latest_lamport,
latest_timestamp: self.latest_timestamp,
cache_diff: AtomicBool::new(false),
diff_calculator: Mutex::new(None),
pending_changes: Default::default(),
}
}
}
@ -71,34 +74,135 @@ impl std::fmt::Debug for OpLog {
.field("changes", &self.changes)
.field("latest_lamport", &self.latest_lamport)
.field("latest_timestamp", &self.latest_timestamp)
.field("cache_diff", &self.cache_diff)
.finish()
}
}
impl OpLog {
pub fn diff(&self, before: &VersionVector, after: &VersionVector) -> Vec<Diff> {
let diff = take(&mut *self.diff_calculator.lock().unwrap()).unwrap_or_default();
let ans = diff.calc(self, before, after);
if self.cache_diff.load(atomic::Ordering::Relaxed) {
self.diff_calculator.lock().unwrap().replace(diff);
}
ans
}
pub fn toggle_fast_diff_mode(&self, on: bool) {
self.cache_diff.store(on, atomic::Ordering::Relaxed)
}
pub fn new() -> Self {
Self {
dag: AppDag::default(),
arena: Default::default(),
changes: ClientChanges::default(),
latest_lamport: Lamport::default(),
latest_timestamp: Timestamp::default(),
cache_diff: AtomicBool::new(false),
diff_calculator: Mutex::new(None),
pending_changes: Default::default(),
}
}
/// Import a change.
///
/// Pending changes that haven't been applied to the dag.
/// A change can be imported only when all its deps are already imported.
/// Key is the ID of the missing dep
///
/// # Err
///
/// Return Err(LoroError::UsedOpID) when the change's id is occupied
pub fn import_change(&mut self, change: Change<RemoteOp>) -> Result<(), LoroError> {
self.check_id_valid(change.id)?;
let change = self.convert_change(change);
if let Err(id) = self.check_deps(&change.deps) {
self.pending_changes.entry(id).or_default().push(change);
return Ok(());
}
self.changes.entry(change.id.peer).or_default().push(change);
Ok(())
}
fn check_id_valid(&self, id: ID) -> Result<(), LoroError> {
let cur_end = self.dag.vv.get(&id.peer).cloned().unwrap_or(0);
if cur_end > id.counter {
return Err(LoroError::UsedOpID { id });
}
Ok(())
}
fn check_deps(&self, deps: &Frontiers) -> Result<(), ID> {
for dep in deps.iter() {
if !self.dag.vv.includes_id(*dep) {
return Err(*dep);
}
}
Ok(())
}
fn convert_change(&mut self, change: Change<RemoteOp>) -> Change {
Change {
id: change.id,
deps: change.deps,
lamport: change.lamport,
timestamp: change.timestamp,
ops: change
.ops
.into_iter()
.flat_map(|op| self.convert_op(op))
.collect(),
}
}
fn convert_op<'a, 'b>(&'a mut self, op: RemoteOp<'b>) -> SmallVec<[Op; 3]> {
let container = self.arena.register_container(&op.container);
let counter = op.counter;
op.contents
.into_iter()
.map(move |content| match content {
crate::op::RemoteContent::Map(map) => {
let value = self.arena.alloc_value(map.value) as u32;
Op {
counter,
container,
content: crate::op::InnerContent::Map(InnerMapSet {
key: map.key,
value,
}),
}
}
crate::op::RemoteContent::List(list) => match list {
ListOp::Insert { slice, pos } => match slice {
crate::text::text_content::ListSlice::RawData(values) => {
let (from, to) = self.arena.alloc_values(values.iter().cloned());
Op {
counter,
container,
content: crate::op::InnerContent::List(InnerListOp::Insert {
slice: SliceRange::from(from as u32..to as u32),
pos,
}),
}
}
crate::text::text_content::ListSlice::RawStr(str) => {
let bytes = self.arena.alloc_bytes(str.as_bytes());
Op {
counter,
container,
content: crate::op::InnerContent::List(InnerListOp::Insert {
slice: SliceRange::from(
bytes.start() as u32..bytes.end() as u32,
),
pos,
}),
}
}
crate::text::text_content::ListSlice::Unknown(u) => Op {
counter,
container,
content: crate::op::InnerContent::List(InnerListOp::Insert {
slice: SliceRange::new_unknown(u as u32),
pos,
}),
},
},
ListOp::Delete(span) => Op {
counter,
container,
content: crate::op::InnerContent::List(InnerListOp::Delete(span)),
},
},
})
.collect()
}
}