diff --git a/crates/loro-internal/src/kv_store/sstable.rs b/crates/loro-internal/src/kv_store/sstable.rs index 3990706a..32364d31 100644 --- a/crates/loro-internal/src/kv_store/sstable.rs +++ b/crates/loro-internal/src/kv_store/sstable.rs @@ -101,10 +101,22 @@ impl NormalBlock { fn first_key(&self)->Bytes{ let mut buf = self.data.as_ref(); - // skip common prefix of the first key buf.get_u8(); let key_len = buf.get_u16() as usize; - Bytes::from(buf[..key_len].to_vec()) + self.data.slice(SIZE_OF_U8 + SIZE_OF_U16..SIZE_OF_U8 + SIZE_OF_U16 + key_len) + } + + fn last_key(&self)->Bytes{ + let offset = self.offsets.last().map_or(0, |&o| o as usize); + let mut buf = self.data.slice(offset..); + let common = buf.get_u8(); + let key_len = buf.get_u16() as usize; + let key = buf.slice(.. key_len); + let prefix = self.first_key().slice(..common as usize); + let mut last_key = Vec::with_capacity(prefix.len() + key.len()); + last_key.extend(prefix); + last_key.extend(key); + last_key.into() } } @@ -133,6 +145,13 @@ impl Block{ } } + pub fn last_key(&self)->Bytes{ + match self{ + Block::Normal(block)=>block.last_key(), + Block::Large(block)=>block.key(), + } + } + fn encode(&self)->Bytes{ match self{ Block::Normal(block)=>block.encode(), @@ -267,7 +286,7 @@ pub(crate) struct BlockMeta{ offset: usize, is_large: bool, first_key: Bytes, - last_key: Option, + last_key: Bytes, } impl BlockMeta{ @@ -281,23 +300,7 @@ impl BlockMeta{ fn encode_meta(meta: &[BlockMeta], buf: &mut Vec){ // the number of blocks let mut estimated_size = SIZE_OF_U32; - for m in meta{ - // offset - estimated_size += SIZE_OF_U32; - // first key length - estimated_size += SIZE_OF_U16; - // first key - estimated_size += m.first_key.len(); - // is large - estimated_size += SIZE_OF_U8; - if m.is_large{ - continue; - } - // last key length - estimated_size += SIZE_OF_U16; - // last key - estimated_size += m.last_key.as_ref().unwrap().len(); - } + estimated_size += meta.len() * (SIZE_OF_U32 + SIZE_OF_U8); // checksum estimated_size += SIZE_OF_U32; @@ -306,16 +309,9 @@ impl BlockMeta{ buf.put_u32(meta.len() as u32); for m in meta{ buf.put_u32(m.offset as u32); - buf.put_u16(m.first_key.len() as u16); - buf.put_slice(&m.first_key); buf.put_u8(m.is_large as u8); - if m.is_large{ - continue; - } - buf.put_u16(m.last_key.as_ref().unwrap().len() as u16); - buf.put_slice(m.last_key.as_ref().unwrap()); } - let checksum = crc32fast::hash(&buf[ori_length+4..]); + let checksum = crc32fast::hash(&buf[ori_length+SIZE_OF_U32..]); buf.put_u32(checksum); } @@ -325,16 +321,8 @@ impl BlockMeta{ let checksum = crc32fast::hash(&buf[..buf.remaining() - SIZE_OF_U32]); for _ in 0..num{ let offset = buf.get_u32() as usize; - let first_key_len = buf.get_u16() as usize; - let first_key = buf.copy_to_bytes(first_key_len); let is_large = buf.get_u8() == 1; - if is_large{ - ans.push(BlockMeta{offset, is_large, first_key, last_key: None}); - continue; - } - let last_key_len = buf.get_u16() as usize; - let last_key = buf.copy_to_bytes(last_key_len); - ans.push(BlockMeta{offset, is_large, first_key, last_key: Some(last_key)}); + ans.push(BlockMeta{offset, is_large, first_key: Bytes::new(), last_key: Bytes::new()}); } let checksum_read = buf.get_u32(); if checksum != checksum_read{ @@ -397,11 +385,17 @@ impl SsTableBuilder{ let block = builder.build(); let encoded_bytes = block.encode(); let is_large = block.is_large(); + let first_key = std::mem::take(&mut self.first_key); + let last_key = std::mem::take(&mut self.last_key); let meta = BlockMeta{ offset: self.data.len(), is_large, - first_key: std::mem::take(&mut self.first_key), - last_key: if is_large{None}else{Some(std::mem::take(&mut self.last_key))} , + first_key: first_key.clone(), + last_key: if is_large{ + first_key + }else{ + last_key + } , }; self.meta.push(meta); self.data.extend_from_slice(&encoded_bytes); @@ -421,7 +415,7 @@ impl SsTableBuilder{ BlockMeta::encode_meta(&self.meta, &mut buf); buf.put_u32(meta_offset); let first_key = self.meta.first().map(|m|m.first_key.clone()).unwrap_or_default(); - let last_key = self.meta.last().map(|m|m.last_key.clone().unwrap_or(self.meta.last().map(|m|m.first_key.clone()).unwrap_or_default())).unwrap_or_default(); + let last_key = self.meta.last().map(|m|m.last_key.clone()).unwrap_or_default(); SsTable { data: Bytes::from(buf), first_key, @@ -492,10 +486,10 @@ impl SsTable{ let data_len = bytes.len(); let meta_offset = (&bytes[data_len-SIZE_OF_U32..]).get_u32() as usize; let raw_meta = &bytes[meta_offset..data_len-SIZE_OF_U32]; - let meta = BlockMeta::decode_meta(raw_meta)?; - Self::check_block_checksum(&meta, &bytes, meta_offset)?; + let mut meta = BlockMeta::decode_meta(raw_meta)?; + Self::check_block_checksum_first_last_key(&mut meta, &bytes, meta_offset)?; let first_key = meta.first().map(|m|m.first_key.clone()).unwrap_or_default(); - let last_key = meta.last().map(|m|m.last_key.clone().unwrap_or(meta.last().map(|m|m.first_key.clone()).unwrap_or_default())).unwrap_or_default(); + let last_key = meta.last().map(|m|m.last_key.clone()).unwrap_or_default(); let ans = Self { data: bytes, first_key, @@ -508,7 +502,7 @@ impl SsTable{ Ok(ans) } - fn check_block_checksum(meta: &[BlockMeta], bytes: &Bytes, meta_offset: usize)->LoroResult<()>{ + fn check_block_checksum_first_last_key(meta: &mut [BlockMeta], bytes: &Bytes, meta_offset: usize)->LoroResult<()>{ for i in 0..meta.len(){ let offset = meta[i].offset; let offset_end = meta.get(i+1).map_or(meta_offset, |m| m.offset); @@ -517,6 +511,11 @@ impl SsTable{ if checksum != crc32fast::hash(&raw_block_and_check[..raw_block_and_check.len() - SIZE_OF_U32]){ return Err(LoroError::DecodeChecksumMismatchError); } + + let meta_mut = &mut meta[i]; + let block = Block::decode(raw_block_and_check, meta_mut.is_large); + meta_mut.first_key = block.first_key(); + meta_mut.last_key = block.last_key(); } Ok(()) } @@ -529,7 +528,7 @@ impl SsTable{ pub fn find_prev_block_idx(&self, key: &[u8])->usize{ self.meta - .partition_point(|meta| meta.last_key.as_ref().unwrap_or(&meta.first_key) <= key) + .partition_point(|meta| meta.last_key.as_ref() <= key) } fn read_block(&self, block_idx: usize)->Arc{ @@ -719,7 +718,7 @@ impl<'a> SsTableIter<'a>{ } } if let Some(key) = end_excluded { - if ans.is_prev_valid() && ans.prev_key() == key{ + if ans.is_prev_valid() && ans.prev_key() == key{ ans.prev(); } } @@ -735,6 +734,7 @@ impl<'a> SsTableIter<'a>{ } pub fn is_next_valid(&self)->bool{ + // TODO: the next iter maybe not valid too self.next_block_iter.next_is_valid() || (self.next_block_idx as isize) < self.prev_block_idx } @@ -750,6 +750,7 @@ impl<'a> SsTableIter<'a>{ if self.next_first{ self.next_block_iter.prev_is_valid() }else{ + // TODO: the next prev iter maybe not valid too self.prev_block_iter.prev_is_valid() || (self.next_block_idx as isize) < self.prev_block_idx } }