use anyhow::Result; use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use prost::Message as _; use rpc::proto::Envelope; use std::mem::size_of; #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] pub struct MessageId(pub u32); pub type MessageLen = u32; pub const MESSAGE_LEN_SIZE: usize = size_of::(); pub fn message_len_from_buffer(buffer: &[u8]) -> MessageLen { MessageLen::from_le_bytes(buffer.try_into().unwrap()) } pub async fn read_message_with_len( stream: &mut S, buffer: &mut Vec, message_len: MessageLen, ) -> Result { buffer.resize(message_len as usize, 0); stream.read_exact(buffer).await?; Ok(Envelope::decode(buffer.as_slice())?) } pub async fn read_message( stream: &mut S, buffer: &mut Vec, ) -> Result { buffer.resize(MESSAGE_LEN_SIZE, 0); stream.read_exact(buffer).await?; let len = message_len_from_buffer(buffer); read_message_with_len(stream, buffer, len).await } pub async fn write_message( stream: &mut S, buffer: &mut Vec, message: Envelope, ) -> Result<()> { let message_len = message.encoded_len() as u32; stream .write_all(message_len.to_le_bytes().as_slice()) .await?; buffer.clear(); buffer.reserve(message_len as usize); message.encode(buffer)?; stream.write_all(buffer).await?; Ok(()) }