feat: support txn abort for states

This commit is contained in:
Zixuan Chen 2023-07-05 23:13:39 +08:00
parent 8f6a6e1cc2
commit fd588beee2
5 changed files with 228 additions and 5 deletions

View file

@ -24,6 +24,13 @@ pub struct AppState {
#[enum_dispatch]
pub trait ContainerState: Clone {
fn apply_diff(&mut self, diff: Diff);
/// Start a transaction
///
/// The transaction may be aborted later, then all the ops during this transaction need to be undone.
fn start_txn(&mut self);
fn abort_txn(&mut self);
fn commit_txn(&mut self);
}
#[enum_dispatch(ContainerState)]

View file

@ -13,12 +13,29 @@ use super::ContainerState;
type ContainerMapping = Arc<Mutex<FxHashMap<ContainerID, ArenaIndex>>>;
#[derive(Clone)]
pub struct ListState {
list: BTree<List>,
in_txn: bool,
undo_stack: Vec<UndoItem>,
child_container_to_leaf: Arc<Mutex<FxHashMap<ContainerID, ArenaIndex>>>,
}
impl Clone for ListState {
fn clone(&self) -> Self {
Self {
list: self.list.clone(),
in_txn: false,
undo_stack: Vec::new(),
child_container_to_leaf: Default::default(),
}
}
}
enum UndoItem {
Insert { index: usize, len: usize },
Delete { index: usize, value: LoroValue },
}
struct List;
impl BTreeTrait for List {
type Elem = LoroValue;
@ -99,6 +116,8 @@ impl ListState {
Self {
list: tree,
in_txn: false,
undo_stack: Vec::new(),
child_container_to_leaf: mapping,
}
}
@ -191,6 +210,30 @@ impl ContainerState for ListState {
}
}
}
#[doc = " Start a transaction"]
#[doc = ""]
#[doc = " The transaction may be aborted later, then all the ops during this transaction need to be undone."]
fn start_txn(&mut self) {
self.in_txn = true;
}
fn abort_txn(&mut self) {
self.in_txn = false;
while let Some(op) = self.undo_stack.pop() {
match op {
UndoItem::Insert { index, len } => {
self.delete_range(index..index + len);
}
UndoItem::Delete { index, value } => self.insert(index, value),
}
}
}
fn commit_txn(&mut self) {
self.undo_stack.clear();
self.in_txn = false;
}
}
#[cfg(test)]

View file

@ -1,3 +1,5 @@
use std::mem;
use fxhash::FxHashMap;
use crate::{delta::MapValue, event::Diff, InternalString};
@ -7,14 +9,46 @@ use super::ContainerState;
#[derive(Clone)]
pub struct MapState {
map: FxHashMap<InternalString, MapValue>,
in_txn: bool,
map_when_txn_start: FxHashMap<InternalString, Option<MapValue>>,
}
impl ContainerState for MapState {
fn apply_diff(&mut self, diff: Diff) {
if let Diff::NewMap(delta) = diff {
for (key, value) in delta.updated {
let old = self.map.insert(key.clone(), value);
self.store_txn_snapshot(key, old);
}
}
}
fn abort_txn(&mut self) {
for (key, value) in mem::take(&mut self.map_when_txn_start) {
if let Some(value) = value {
self.map.insert(key, value);
}
} else {
self.map.remove(&key);
}
}
self.in_txn = false;
}
fn start_txn(&mut self) {
self.in_txn = true;
}
fn commit_txn(&mut self) {
self.map_when_txn_start.clear();
self.in_txn = false;
}
}
impl MapState {
fn store_txn_snapshot(&mut self, key: InternalString, old: Option<MapValue>) {
if self.in_txn && !self.map_when_txn_start.contains_key(&key) {
self.map_when_txn_start.insert(key, old);
}
}
}

View file

@ -1,12 +1,40 @@
use std::ops::Range;
use jumprope::JumpRope;
use crate::{delta::DeltaItem, event::Diff};
use super::ContainerState;
#[derive(Clone)]
#[derive(Default)]
pub struct TextState {
pub(crate) rope: JumpRope,
in_txn: bool,
deleted_bytes: Vec<u8>,
undo_stack: Vec<UndoItem>,
}
impl Clone for TextState {
fn clone(&self) -> Self {
Self {
rope: self.rope.clone(),
in_txn: false,
deleted_bytes: Default::default(),
undo_stack: Default::default(),
}
}
}
enum UndoItem {
Insert {
index: u32,
len: u32,
},
Delete {
index: u32,
byte_offset: u32,
len: u32,
},
}
impl ContainerState for TextState {
@ -20,13 +48,125 @@ impl ContainerState for TextState {
}
DeltaItem::Insert { value, .. } => {
self.rope.insert(index, value);
if self.in_txn {
self.record_insert(index, value.len());
}
index += value.len();
}
DeltaItem::Delete { len, .. } => {
if self.in_txn {
self.record_del(index, *len);
}
self.rope.remove(index..index + len);
}
}
}
}
}
#[doc = " Start a transaction"]
#[doc = ""]
#[doc = " The transaction may be aborted later, then all the ops during this transaction need to be undone."]
fn start_txn(&mut self) {
self.in_txn = true;
}
fn abort_txn(&mut self) {
self.in_txn = false;
while let Some(op) = self.undo_stack.pop() {
match op {
UndoItem::Insert { index, len } => {
self.rope
.remove(index as usize..index as usize + len as usize);
}
UndoItem::Delete {
index,
byte_offset,
len,
} => {
let s = std::str::from_utf8(
&self.deleted_bytes
[byte_offset as usize..byte_offset as usize + len as usize],
)
.unwrap();
self.rope.insert(index as usize, s);
}
}
}
self.deleted_bytes.clear();
}
fn commit_txn(&mut self) {
self.deleted_bytes.clear();
self.undo_stack.clear();
self.in_txn = false;
}
}
impl TextState {
pub fn new() -> Self {
Self {
rope: JumpRope::new(),
in_txn: false,
deleted_bytes: Default::default(),
undo_stack: Default::default(),
}
}
pub fn insert(&mut self, pos: usize, s: &str) {
if self.in_txn {
self.record_insert(pos, s.len());
}
self.rope.insert(pos, s);
}
pub fn delete(&mut self, range: Range<usize>) {
if self.in_txn {
self.record_del(range.start, range.len());
}
self.rope.remove(range);
}
fn record_del(&mut self, index: usize, len: usize) {
let mut start = None;
for span in self.rope.slice_substrings(index..index + len) {
if start.is_none() {
start = Some(self.deleted_bytes.len());
}
self.deleted_bytes.extend_from_slice(span.as_bytes());
}
self.undo_stack.push(UndoItem::Delete {
index: index as u32,
byte_offset: start.unwrap() as u32,
len: len as u32,
});
}
fn record_insert(&mut self, index: usize, len: usize) {
self.undo_stack.push(UndoItem::Insert {
index: index as u32,
len: len as u32,
});
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn abort_txn() {
let mut state = TextState::new();
state.insert(0, "haha");
state.start_txn();
state.insert(4, "1234");
state.delete(2..6);
assert_eq!(state.rope.to_string(), "ha34");
state.abort_txn();
assert_eq!(state.rope.to_string(), "haha");
}
}

View file

@ -6,8 +6,7 @@ use loro_internal::{
};
pub use loro_internal::{
container::ContainerIdx, event, id::PeerID, EncodeMode, List, LoroError, LoroValue, Map, Text,
VersionVector,
event, id::PeerID, EncodeMode, List, LoroError, LoroValue, Map, Text, VersionVector,
};
#[repr(transparent)]