fix: meta first last key

This commit is contained in:
Leon Zhao 2024-08-20 21:25:14 +08:00
parent e1384fd857
commit a069c1ed37
No known key found for this signature in database
GPG key ID: 449E9D4257BA04DE

View file

@ -101,10 +101,22 @@ impl NormalBlock {
fn first_key(&self)->Bytes{ fn first_key(&self)->Bytes{
let mut buf = self.data.as_ref(); let mut buf = self.data.as_ref();
// skip common prefix of the first key
buf.get_u8(); buf.get_u8();
let key_len = buf.get_u16() as usize; let key_len = buf.get_u16() as usize;
Bytes::from(buf[..key_len].to_vec()) self.data.slice(SIZE_OF_U8 + SIZE_OF_U16..SIZE_OF_U8 + SIZE_OF_U16 + key_len)
}
fn last_key(&self)->Bytes{
let offset = self.offsets.last().map_or(0, |&o| o as usize);
let mut buf = self.data.slice(offset..);
let common = buf.get_u8();
let key_len = buf.get_u16() as usize;
let key = buf.slice(.. key_len);
let prefix = self.first_key().slice(..common as usize);
let mut last_key = Vec::with_capacity(prefix.len() + key.len());
last_key.extend(prefix);
last_key.extend(key);
last_key.into()
} }
} }
@ -133,6 +145,13 @@ impl Block{
} }
} }
pub fn last_key(&self)->Bytes{
match self{
Block::Normal(block)=>block.last_key(),
Block::Large(block)=>block.key(),
}
}
fn encode(&self)->Bytes{ fn encode(&self)->Bytes{
match self{ match self{
Block::Normal(block)=>block.encode(), Block::Normal(block)=>block.encode(),
@ -267,7 +286,7 @@ pub(crate) struct BlockMeta{
offset: usize, offset: usize,
is_large: bool, is_large: bool,
first_key: Bytes, first_key: Bytes,
last_key: Option<Bytes>, last_key: Bytes,
} }
impl BlockMeta{ impl BlockMeta{
@ -281,23 +300,7 @@ impl BlockMeta{
fn encode_meta(meta: &[BlockMeta], buf: &mut Vec<u8>){ fn encode_meta(meta: &[BlockMeta], buf: &mut Vec<u8>){
// the number of blocks // the number of blocks
let mut estimated_size = SIZE_OF_U32; let mut estimated_size = SIZE_OF_U32;
for m in meta{ estimated_size += meta.len() * (SIZE_OF_U32 + SIZE_OF_U8);
// offset
estimated_size += SIZE_OF_U32;
// first key length
estimated_size += SIZE_OF_U16;
// first key
estimated_size += m.first_key.len();
// is large
estimated_size += SIZE_OF_U8;
if m.is_large{
continue;
}
// last key length
estimated_size += SIZE_OF_U16;
// last key
estimated_size += m.last_key.as_ref().unwrap().len();
}
// checksum // checksum
estimated_size += SIZE_OF_U32; estimated_size += SIZE_OF_U32;
@ -306,16 +309,9 @@ impl BlockMeta{
buf.put_u32(meta.len() as u32); buf.put_u32(meta.len() as u32);
for m in meta{ for m in meta{
buf.put_u32(m.offset as u32); buf.put_u32(m.offset as u32);
buf.put_u16(m.first_key.len() as u16);
buf.put_slice(&m.first_key);
buf.put_u8(m.is_large as u8); buf.put_u8(m.is_large as u8);
if m.is_large{
continue;
}
buf.put_u16(m.last_key.as_ref().unwrap().len() as u16);
buf.put_slice(m.last_key.as_ref().unwrap());
} }
let checksum = crc32fast::hash(&buf[ori_length+4..]); let checksum = crc32fast::hash(&buf[ori_length+SIZE_OF_U32..]);
buf.put_u32(checksum); buf.put_u32(checksum);
} }
@ -325,16 +321,8 @@ impl BlockMeta{
let checksum = crc32fast::hash(&buf[..buf.remaining() - SIZE_OF_U32]); let checksum = crc32fast::hash(&buf[..buf.remaining() - SIZE_OF_U32]);
for _ in 0..num{ for _ in 0..num{
let offset = buf.get_u32() as usize; let offset = buf.get_u32() as usize;
let first_key_len = buf.get_u16() as usize;
let first_key = buf.copy_to_bytes(first_key_len);
let is_large = buf.get_u8() == 1; let is_large = buf.get_u8() == 1;
if is_large{ ans.push(BlockMeta{offset, is_large, first_key: Bytes::new(), last_key: Bytes::new()});
ans.push(BlockMeta{offset, is_large, first_key, last_key: None});
continue;
}
let last_key_len = buf.get_u16() as usize;
let last_key = buf.copy_to_bytes(last_key_len);
ans.push(BlockMeta{offset, is_large, first_key, last_key: Some(last_key)});
} }
let checksum_read = buf.get_u32(); let checksum_read = buf.get_u32();
if checksum != checksum_read{ if checksum != checksum_read{
@ -397,11 +385,17 @@ impl SsTableBuilder{
let block = builder.build(); let block = builder.build();
let encoded_bytes = block.encode(); let encoded_bytes = block.encode();
let is_large = block.is_large(); let is_large = block.is_large();
let first_key = std::mem::take(&mut self.first_key);
let last_key = std::mem::take(&mut self.last_key);
let meta = BlockMeta{ let meta = BlockMeta{
offset: self.data.len(), offset: self.data.len(),
is_large, is_large,
first_key: std::mem::take(&mut self.first_key), first_key: first_key.clone(),
last_key: if is_large{None}else{Some(std::mem::take(&mut self.last_key))} , last_key: if is_large{
first_key
}else{
last_key
} ,
}; };
self.meta.push(meta); self.meta.push(meta);
self.data.extend_from_slice(&encoded_bytes); self.data.extend_from_slice(&encoded_bytes);
@ -421,7 +415,7 @@ impl SsTableBuilder{
BlockMeta::encode_meta(&self.meta, &mut buf); BlockMeta::encode_meta(&self.meta, &mut buf);
buf.put_u32(meta_offset); buf.put_u32(meta_offset);
let first_key = self.meta.first().map(|m|m.first_key.clone()).unwrap_or_default(); let first_key = self.meta.first().map(|m|m.first_key.clone()).unwrap_or_default();
let last_key = self.meta.last().map(|m|m.last_key.clone().unwrap_or(self.meta.last().map(|m|m.first_key.clone()).unwrap_or_default())).unwrap_or_default(); let last_key = self.meta.last().map(|m|m.last_key.clone()).unwrap_or_default();
SsTable { SsTable {
data: Bytes::from(buf), data: Bytes::from(buf),
first_key, first_key,
@ -492,10 +486,10 @@ impl SsTable{
let data_len = bytes.len(); let data_len = bytes.len();
let meta_offset = (&bytes[data_len-SIZE_OF_U32..]).get_u32() as usize; let meta_offset = (&bytes[data_len-SIZE_OF_U32..]).get_u32() as usize;
let raw_meta = &bytes[meta_offset..data_len-SIZE_OF_U32]; let raw_meta = &bytes[meta_offset..data_len-SIZE_OF_U32];
let meta = BlockMeta::decode_meta(raw_meta)?; let mut meta = BlockMeta::decode_meta(raw_meta)?;
Self::check_block_checksum(&meta, &bytes, meta_offset)?; Self::check_block_checksum_first_last_key(&mut meta, &bytes, meta_offset)?;
let first_key = meta.first().map(|m|m.first_key.clone()).unwrap_or_default(); let first_key = meta.first().map(|m|m.first_key.clone()).unwrap_or_default();
let last_key = meta.last().map(|m|m.last_key.clone().unwrap_or(meta.last().map(|m|m.first_key.clone()).unwrap_or_default())).unwrap_or_default(); let last_key = meta.last().map(|m|m.last_key.clone()).unwrap_or_default();
let ans = Self { let ans = Self {
data: bytes, data: bytes,
first_key, first_key,
@ -508,7 +502,7 @@ impl SsTable{
Ok(ans) Ok(ans)
} }
fn check_block_checksum(meta: &[BlockMeta], bytes: &Bytes, meta_offset: usize)->LoroResult<()>{ fn check_block_checksum_first_last_key(meta: &mut [BlockMeta], bytes: &Bytes, meta_offset: usize)->LoroResult<()>{
for i in 0..meta.len(){ for i in 0..meta.len(){
let offset = meta[i].offset; let offset = meta[i].offset;
let offset_end = meta.get(i+1).map_or(meta_offset, |m| m.offset); let offset_end = meta.get(i+1).map_or(meta_offset, |m| m.offset);
@ -517,6 +511,11 @@ impl SsTable{
if checksum != crc32fast::hash(&raw_block_and_check[..raw_block_and_check.len() - SIZE_OF_U32]){ if checksum != crc32fast::hash(&raw_block_and_check[..raw_block_and_check.len() - SIZE_OF_U32]){
return Err(LoroError::DecodeChecksumMismatchError); return Err(LoroError::DecodeChecksumMismatchError);
} }
let meta_mut = &mut meta[i];
let block = Block::decode(raw_block_and_check, meta_mut.is_large);
meta_mut.first_key = block.first_key();
meta_mut.last_key = block.last_key();
} }
Ok(()) Ok(())
} }
@ -529,7 +528,7 @@ impl SsTable{
pub fn find_prev_block_idx(&self, key: &[u8])->usize{ pub fn find_prev_block_idx(&self, key: &[u8])->usize{
self.meta self.meta
.partition_point(|meta| meta.last_key.as_ref().unwrap_or(&meta.first_key) <= key) .partition_point(|meta| meta.last_key.as_ref() <= key)
} }
fn read_block(&self, block_idx: usize)->Arc<Block>{ fn read_block(&self, block_idx: usize)->Arc<Block>{
@ -719,7 +718,7 @@ impl<'a> SsTableIter<'a>{
} }
} }
if let Some(key) = end_excluded { if let Some(key) = end_excluded {
if ans.is_prev_valid() && ans.prev_key() == key{ if ans.is_prev_valid() && ans.prev_key() == key{
ans.prev(); ans.prev();
} }
} }
@ -735,6 +734,7 @@ impl<'a> SsTableIter<'a>{
} }
pub fn is_next_valid(&self)->bool{ pub fn is_next_valid(&self)->bool{
// TODO: the next iter maybe not valid too
self.next_block_iter.next_is_valid() || (self.next_block_idx as isize) < self.prev_block_idx self.next_block_iter.next_is_valid() || (self.next_block_idx as isize) < self.prev_block_idx
} }
@ -750,6 +750,7 @@ impl<'a> SsTableIter<'a>{
if self.next_first{ if self.next_first{
self.next_block_iter.prev_is_valid() self.next_block_iter.prev_is_valid()
}else{ }else{
// TODO: the next prev iter maybe not valid too
self.prev_block_iter.prev_is_valid() || (self.next_block_idx as isize) < self.prev_block_idx self.prev_block_iter.prev_is_valid() || (self.next_block_idx as isize) < self.prev_block_idx
} }
} }