loro/crates/loro-internal/src/encoding.rs

139 lines
4.4 KiB
Rust
Raw Normal View History

use fxhash::FxHashMap;
use loro_common::PeerID;
use rle::RleVec;
use crate::{change::Change, op::RemoteOp};
pub(crate) type ClientChanges = FxHashMap<PeerID, RleVec<[Change; 0]>>;
pub(crate) type RemoteClientChanges<'a> = FxHashMap<PeerID, Vec<Change<RemoteOp<'a>>>>;
mod encode_changes;
mod encode_enhanced;
mod encode_updates;
use rle::HasLength;
use crate::{oplog::OpLog, LoroError, VersionVector};
use self::{
encode_changes::{decode_oplog_changes, encode_oplog_changes},
encode_updates::decode_oplog_updates,
};
2023-08-29 02:38:48 +00:00
pub(crate) use encode_enhanced::{decode_oplog_v2, encode_oplog_v2};
pub(crate) use encode_updates::encode_oplog_updates;
pub(crate) const COMPRESS_RLE_THRESHOLD: usize = 20 * 1024;
// TODO: Test this threshold
pub(crate) const UPDATE_ENCODE_THRESHOLD: usize = 512;
pub(crate) const MAGIC_BYTES: [u8; 4] = [0x6c, 0x6f, 0x72, 0x6f];
pub(crate) const ENCODE_SCHEMA_VERSION: u8 = 0;
pub enum EncodeMode {
Auto(VersionVector),
Updates(VersionVector),
RleUpdates(VersionVector),
Snapshot,
CompressRleUpdates(VersionVector),
2023-08-29 02:38:48 +00:00
RleUpdatesV2(VersionVector),
}
impl EncodeMode {
pub fn to_byte(&self) -> u8 {
match self {
EncodeMode::Auto(_) => unreachable!(),
EncodeMode::Updates(_) => 0,
EncodeMode::RleUpdates(_) => 1,
EncodeMode::Snapshot => 2,
EncodeMode::CompressRleUpdates(_) => 3,
2023-08-29 02:38:48 +00:00
EncodeMode::RleUpdatesV2(_) => 4,
}
}
}
pub enum ConcreteEncodeMode {
Updates = 0,
RleUpdates = 1,
Snapshot = 2,
CompressedRleUpdates = 3,
2023-08-29 02:38:48 +00:00
RleUpdatesV2 = 4,
}
impl From<u8> for ConcreteEncodeMode {
fn from(value: u8) -> Self {
match value {
0 => ConcreteEncodeMode::Updates,
1 => ConcreteEncodeMode::RleUpdates,
2 => ConcreteEncodeMode::Snapshot,
3 => ConcreteEncodeMode::CompressedRleUpdates,
2023-08-29 02:38:48 +00:00
4 => ConcreteEncodeMode::RleUpdatesV2,
_ => unreachable!(),
}
}
}
pub(crate) fn encode_oplog(oplog: &OpLog, mode: EncodeMode) -> Vec<u8> {
let version = ENCODE_SCHEMA_VERSION;
let mut ans = Vec::from(MAGIC_BYTES);
// maybe u8 is enough
ans.push(version);
let mode = match mode {
EncodeMode::Auto(vv) => {
let self_vv = oplog.vv();
let diff = self_vv.diff(&vv);
let update_total_len = diff
.left
.values()
.map(|value| value.atom_len())
.sum::<usize>();
2023-08-29 09:15:41 +00:00
// EncodeMode::RleUpdates(vv)
if update_total_len <= UPDATE_ENCODE_THRESHOLD {
EncodeMode::Updates(vv)
} else {
2023-08-29 02:38:48 +00:00
EncodeMode::RleUpdatesV2(vv)
}
}
mode => mode,
};
let encoded = match &mode {
EncodeMode::Updates(vv) => encode_oplog_updates(oplog, vv),
EncodeMode::RleUpdates(vv) => encode_oplog_changes(oplog, vv),
EncodeMode::CompressRleUpdates(vv) => {
let bytes = encode_oplog_changes(oplog, vv);
miniz_oxide::deflate::compress_to_vec(&bytes, 7)
}
EncodeMode::Snapshot => unimplemented!(),
2023-08-29 09:15:41 +00:00
EncodeMode::RleUpdatesV2(vv) => encode_oplog_v2(oplog, vv),
_ => unreachable!(),
};
ans.push(mode.to_byte());
ans.extend(encoded);
ans
}
pub(crate) fn decode_oplog(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroError> {
let (magic_bytes, input) = input.split_at(4);
let magic_bytes: [u8; 4] = magic_bytes.try_into().unwrap();
if magic_bytes != MAGIC_BYTES {
return Err(LoroError::DecodeError("Invalid header bytes".into()));
}
let (version, input) = input.split_at(1);
if version != [ENCODE_SCHEMA_VERSION] {
return Err(LoroError::DecodeError("Invalid version".into()));
}
let mode: ConcreteEncodeMode = input[0].into();
let decoded = &input[1..];
match mode {
ConcreteEncodeMode::Updates => decode_oplog_updates(oplog, decoded),
ConcreteEncodeMode::RleUpdates => decode_oplog_changes(oplog, decoded),
ConcreteEncodeMode::CompressedRleUpdates => {
miniz_oxide::inflate::decompress_to_vec(decoded)
.map_err(|_| LoroError::DecodeError("Invalid compressed data".into()))
.and_then(|bytes| decode_oplog_changes(oplog, &bytes))
}
ConcreteEncodeMode::Snapshot => unimplemented!(),
2023-08-29 09:15:41 +00:00
ConcreteEncodeMode::RleUpdatesV2 => decode_oplog_v2(oplog, decoded),
}
}