remove dupe file

This commit is contained in:
sevki 2024-03-13 19:57:30 +00:00
parent 62f0b0f249
commit 54d69c79b4
19 changed files with 5 additions and 3116 deletions

View file

@ -3,13 +3,13 @@
# JetStream [![crates.io](https://img.shields.io/crates/v/jetstream.svg)](https://crates.io/crates/jetstream) [![docs.rs](https://docs.rs/jetstream/badge.svg)](https://docs.rs/jetstream) <!--gh actions--> ![Build Status](https://github.com/sevki/jetstream/actions/workflows/rust.yml/badge.svg) ![Build Status](https://github.com/sevki/jetstream/actions/workflows/release.yml/badge.svg)
JetStream is an RPC framework built on top of s2n-quic and p9.
JetStream is an RPC framework built on top of [s2n-quic](https://crates.io/crates/s2n-quic) and [p9](https://crates.io/crates/p9). It's designed to be a high performance, low latency, secure, and reliable RPC framework.
Features:
- Bidirectional streaming
- 0-RTT
- mTLS
- [mTLS](https://github.com/aws/s2n-quic/tree/main/examples/s2n-mtls)
- binary encoding
## Motivation

View file

@ -22,13 +22,13 @@ fn get_module_colour(module: &str) -> Color {
color
}
pub fn setup_logging() -> Logger {
pub(crate) fn setup_logging() -> Logger {
let x = drain();
slog::Logger::root(x, slog_o!())
}
pub fn drain() -> slog::Fuse<
pub(crate) fn drain() -> slog::Fuse<
slog_term::FullFormat<slog_term::PlainSyncDecorator<std::io::Stdout>>,
> {
let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());

View file

@ -11,6 +11,7 @@ use tower::Service;
pub use jetstream_p9_wire_format_derive::P9WireFormat;
/// Message trait for JetStream messages, which need to implement the `WireFormat` trait.
pub trait Message: WireFormat + Send + Sync {}
/// A trait for implementing a 9P service.

View file

@ -1,199 +0,0 @@
use futures_util::{FutureExt, TryFutureExt};
use p9::WireFormat;
use std::{
future::Future,
io::{self, Write},
pin::Pin,
};
use tokio::{
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufWriter},
runtime::Handle,
};
pub trait AsyncWireFormat: std::marker::Sized {
fn encode_async<W: AsyncWriteExt + Unpin + Send>(
self,
writer: &mut W,
) -> impl std::future::Future<Output = io::Result<()>> + Send;
fn decode_async<R: AsyncReadExt + Unpin + Send>(
reader: &mut R,
) -> impl std::future::Future<Output = io::Result<Self>> + Send;
}
type Task = Pin<Box<dyn std::future::Future<Output = io::Result<()>> + Send>>;
pub trait AsyncWireFormatExt
where
Self: WireFormat + Send,
{
fn encode_async<W>(
self,
writer: W,
) -> impl Future<Output = io::Result<()>> + Send
where
Self: Sync,
W: AsyncWrite + Unpin + Send,
{
let mut writer = tokio_util::io::SyncIoBridge::new(writer);
async { tokio::task::block_in_place(move || self.encode(&mut writer)) }
}
fn decode_async<R>(
reader: R,
) -> impl Future<Output = io::Result<Self>> + Send
where
Self: Sync,
R: AsyncRead + Unpin + Send,
{
let mut reader = tokio_util::io::SyncIoBridge::new(reader);
async { tokio::task::block_in_place(move || Self::decode(&mut reader)) }
}
}
impl<T: WireFormat + Send> AsyncWireFormatExt for T {}
// tests
mod tests {
use p9::{Tframe, Tmessage, Tversion};
use tokio::{io::duplex, time::sleep};
use crate::ConvertWireFormat;
use super::*;
use std::{
io::Cursor,
pin::{self, Pin},
thread::{self, ThreadId},
time::Duration,
};
struct DelayedWriter {
inner: Vec<u8>, // Simulates a buffer
delay: Duration,
}
impl DelayedWriter {
fn new(delay: Duration) -> Self {
Self {
inner: Vec::new(),
delay,
}
}
}
struct BlockingIO<T: Sized + Unpin> {
delay: Duration,
inner: T,
read_thread_id: Option<ThreadId>,
}
impl BlockingIO<tokio::io::DuplexStream> {
fn new(delay: Duration, inner: tokio::io::DuplexStream) -> Self {
Self {
delay,
inner: inner,
read_thread_id: None,
}
}
}
impl<T> AsyncRead for BlockingIO<T>
where
T: AsyncRead + Unpin,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<io::Result<()>> {
let delay = self.delay;
// If there's a delay, we schedule a sleep before proceeding with the read.
if delay > Duration::ZERO {
// This future will complete after the specified delay.
tokio::spawn(async move {
sleep(delay).await;
});
}
let mut this = self.get_mut();
// Poll the inner AsyncRead.
Pin::new(&mut this.inner).poll_read(cx, buf)
}
}
impl<T> AsyncWrite for BlockingIO<T>
where
T: AsyncWrite + Unpin,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<io::Result<usize>> {
let delay = self.delay;
// If there's a delay, we schedule a sleep before proceeding with the write.
if delay > Duration::ZERO {
// This future will complete after the specified delay.
tokio::spawn(async move {
sleep(delay).await;
});
}
let mut this = self.get_mut();
// Poll the inner AsyncWrite.
Pin::new(&mut this.inner).poll_write(cx, buf)
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<io::Result<()>> {
let mut this = self.get_mut();
Pin::new(&mut this.inner).poll_flush(cx)
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<io::Result<()>> {
let mut this = self.get_mut();
Pin::new(&mut this.inner).poll_shutdown(cx)
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_async_wire_format() {
let test = Tframe {
tag: 0,
msg: Ok(Tmessage::Version(Tversion {
msize: 8192,
version: "9P2000.L".to_string(),
})),
};
let mut buf = Vec::new();
test.encode_async(&mut buf).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded = Tframe::decode_async(&mut cursor).await.unwrap();
assert_eq!(decoded.tag, 0);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_async_wire_format_delayed() {
let test = Tframe {
tag: 0,
msg: Ok(Tmessage::Version(Tversion {
msize: 8192,
version: "9P2000.L".to_string(),
})),
};
let (mut upstream, mut downstream) = tokio::io::duplex(1024);
let writer = BlockingIO::new(Duration::from_millis(1), upstream);
let reader = BlockingIO::new(Duration::from_millis(1), downstream);
test.encode_async(writer).await.unwrap();
let decoded = Tframe::decode_async(reader).await.unwrap();
assert_eq!(decoded.tag, 0);
}
}

View file

@ -1,9 +0,0 @@
#[cfg(test)]
use tokio::runtime::Runtime;
use super::*;
#[test]
fn it_works() {
let rt = Runtime::new().unwrap();
}

View file

@ -1,763 +0,0 @@
use std::{
collections::{BTreeMap, HashMap},
fmt::Display,
io::{Read, Write},
};
use async_trait::async_trait;
use futures::{AsyncReadExt, AsyncWriteExt};
use p9::{wire_format, Tmessage, WireFormat};
use s2n_quic::stream::{BidirectionalStream, ReceiveStream, SendStream};
use tokio::{runtime::Runtime, sync::Mutex};
use crate::versions::{self, DEFAULT_MSIZE};
pub struct NinePClient<'a> {
msize: usize,
connection: s2n_quic::Connection,
rt: &'a Runtime,
// Tag map
tags: Mutex<BTreeMap<u16, NinePClientConnection<'a>>>,
}
pub struct Name(Vec<String>);
pub struct Owned(u16, Name);
pub struct File {
pub name: Name,
}
pub struct Dir {
pub name: Name,
}
pub struct DirEntry(Name);
impl genfs::DirEntry for DirEntry {
type Path = Name;
type PathOwned = Owned;
type Metadata = std::fs::Metadata;
type FileType = std::fs::FileType;
type Error = std::io::Error;
fn path(&self) -> Self::PathOwned {
todo!()
}
fn metadata(
&self,
) -> std::prelude::v1::Result<Self::Metadata, Self::Error> {
todo!()
}
fn file_type(
&self,
) -> std::prelude::v1::Result<Self::FileType, Self::Error> {
todo!()
}
fn file_name(&self) -> &Self::Path {
todo!()
}
}
impl Iterator for Dir {
type Item = Result<DirEntry, std::io::Error>;
fn next(&mut self) -> Option<Self::Item> {
todo!()
}
}
impl genfs::Dir<DirEntry, std::io::Error> for Dir {}
impl genfs::File for File {
type Error = std::io::Error;
fn read(
&self,
buf: &mut [u8],
) -> std::prelude::v1::Result<usize, Self::Error> {
todo!()
}
fn write(
&mut self,
buf: &[u8],
) -> std::prelude::v1::Result<usize, Self::Error> {
todo!()
}
fn flush(&mut self) -> std::prelude::v1::Result<(), Self::Error> {
todo!()
}
fn seek(
&mut self,
pos: genfs::SeekFrom,
) -> std::prelude::v1::Result<u64, Self::Error> {
todo!()
}
}
/// The borrowed path slice that represents a relative or absolute path on
/// the filesystem.
impl<'a> genfs::Fs for NinePClient<'a> {
type Path = Name;
type PathOwned = Owned;
type File = File;
type Dir = Dir;
type DirEntry = DirEntry;
type Metadata = std::fs::Metadata;
type Permissions = std::fs::Permissions;
type Error = std::io::Error;
fn open(
&self,
path: &Self::Path,
options: &genfs::OpenOptions<Self::Permissions>,
) -> std::prelude::v1::Result<Self::File, Self::Error> {
todo!()
}
fn remove_file(
&mut self,
path: &Self::Path,
) -> std::prelude::v1::Result<(), Self::Error> {
todo!()
}
fn metadata(
&self,
path: &Self::Path,
) -> std::prelude::v1::Result<Self::Metadata, Self::Error> {
todo!()
}
fn symlink_metadata(
&self,
path: &Self::Path,
) -> std::prelude::v1::Result<Self::Metadata, Self::Error> {
todo!()
}
fn rename(
&mut self,
from: &Self::Path,
to: &Self::Path,
) -> std::prelude::v1::Result<(), Self::Error> {
todo!()
}
fn copy(
&mut self,
from: &Self::Path,
to: &Self::Path,
) -> std::prelude::v1::Result<u64, Self::Error> {
todo!()
}
fn hard_link(
&mut self,
src: &Self::Path,
dst: &Self::Path,
) -> std::prelude::v1::Result<(), Self::Error> {
todo!()
}
fn symlink(
&mut self,
src: &Self::Path,
dst: &Self::Path,
) -> std::prelude::v1::Result<(), Self::Error> {
todo!()
}
fn read_link(
&self,
path: &Self::Path,
) -> std::prelude::v1::Result<Self::PathOwned, Self::Error> {
todo!()
}
fn canonicalize(
&self,
path: &Self::Path,
) -> std::prelude::v1::Result<Self::PathOwned, Self::Error> {
todo!()
}
fn create_dir(
&mut self,
path: &Self::Path,
options: &genfs::DirOptions<Self::Permissions>,
) -> std::prelude::v1::Result<(), Self::Error> {
todo!()
}
fn remove_dir(
&mut self,
path: &Self::Path,
) -> std::prelude::v1::Result<(), Self::Error> {
todo!()
}
fn remove_dir_all(
&mut self,
path: &Self::Path,
) -> std::prelude::v1::Result<(), Self::Error> {
todo!()
}
fn read_dir(
&self,
path: &Self::Path,
) -> std::prelude::v1::Result<Self::Dir, Self::Error> {
todo!()
}
fn set_permissions(
&mut self,
path: &Self::Path,
perm: Self::Permissions,
) -> std::prelude::v1::Result<(), Self::Error> {
todo!()
}
}
pub struct TStream<'a> {
rt: &'a tokio::runtime::Runtime,
stream: SendStream,
}
impl<'a> TStream<'a> {
pub fn new(rt: &'a Runtime, stream: SendStream) -> Self {
Self { rt, stream }
}
}
impl<'a> std::io::Write for TStream<'a> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let _guard = self.rt.enter();
self.rt.block_on(async {
match self.stream.write(buf).await {
std::io::Result::Ok(n) => Ok(n),
Err(err) => std::io::Result::Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("failed to write to stream: {}", err),
)),
}
})
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
pub struct RStream<'a> {
rt: &'a tokio::runtime::Runtime,
stream: ReceiveStream,
}
impl<'a> RStream<'a> {
pub fn new(rt: &'a Runtime, stream: ReceiveStream) -> Self {
Self { rt, stream }
}
}
impl<'a> std::io::Read for RStream<'a> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let _guard = self.rt.enter();
self.rt.block_on(async {
match self.stream.read(buf).await {
Ok(n) => match n {
0 => Err(std::io::Error::new(
std::io::ErrorKind::Other,
"stream closed",
)),
n => Ok(n),
},
Err(err) => Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("failed to read from stream: {}", err),
)),
}
})
}
}
impl<'a> NinePClient<'a> {
pub fn new(connection: s2n_quic::Connection, rt: &'a Runtime) -> Self {
Self {
rt,
msize: DEFAULT_MSIZE,
tags: Mutex::new(BTreeMap::new()),
connection,
}
}
}
pub struct NinePClientConnection<'a> {
fids: Mutex<HashMap<u32, u64>>,
tx: TStream<'a>,
rx: RStream<'a>,
turn: Turn,
}
impl<'a> NinePClient<'a> {
pub async fn attach(
&mut self,
tag: u16,
aname: &str,
) -> NinePClientConnection<'a> {
let bi_stream = self.connection.open_bidirectional_stream().await.unwrap();
let (recv, send) = bi_stream.split();
let mut conn = NinePClientConnection {
fids: Mutex::new(HashMap::new()),
tx: TStream::new(self.rt, send),
rx: RStream::new(self.rt, recv),
turn: Turn::Client,
};
let _ = conn.version(0, DEFAULT_MSIZE, versions::Version::V9P2024q9p.into()).await;
conn
}
}
enum Turn {
Client,
Server,
}
impl<'a> NinePClientConnection<'a> {
pub async fn attach(
&mut self,
tag: u16,
aname: &str,
uname: &str,
) -> std::prelude::v1::Result<(), std::io::Error> {
todo!()
}
pub async fn version(
&mut self,
tag: u16,
msize: usize,
version: &str,
) -> std::prelude::v1::Result<(), std::io::Error> {
todo!()
}
async fn write_message(
&mut self,
tag: u16,
msg: &Tmessage,
tx: &mut TStream<'a>,
) -> std::prelude::v1::Result<(), std::io::Error> {
let mut _guard = self.fids.lock().await;
let _ = match msg {
Tmessage::Version(version) => {
wire_format::WireFormat::encode(version, tx)
}
Tmessage::Flush(flush) => {
wire_format::WireFormat::encode(flush, tx)
}
Tmessage::Read(read) => wire_format::WireFormat::encode(read, tx),
Tmessage::Write(write) => {
wire_format::WireFormat::encode(write, tx)
}
Tmessage::Clunk(clunk) => {
wire_format::WireFormat::encode(clunk, tx)
}
Tmessage::Remove(remove) => {
wire_format::WireFormat::encode(remove, tx)
}
Tmessage::Attach(attach) => {
wire_format::WireFormat::encode(attach, tx)
}
Tmessage::Auth(auth) => wire_format::WireFormat::encode(auth, tx),
Tmessage::Statfs(statfs) => {
wire_format::WireFormat::encode(statfs, tx)
}
Tmessage::Lopen(lopen) => {
wire_format::WireFormat::encode(lopen, tx)
}
Tmessage::Lcreate(lcreate) => {
wire_format::WireFormat::encode(lcreate, tx)
}
Tmessage::Symlink(symlink) => {
wire_format::WireFormat::encode(symlink, tx)
}
Tmessage::Mknod(mknod) => {
wire_format::WireFormat::encode(mknod, tx)
}
Tmessage::Rename(rename) => {
wire_format::WireFormat::encode(rename, tx)
}
Tmessage::Readlink(readlink) => {
wire_format::WireFormat::encode(readlink, tx)
}
Tmessage::GetAttr(getattr) => {
wire_format::WireFormat::encode(getattr, tx)
}
Tmessage::SetAttr(setattr) => {
wire_format::WireFormat::encode(setattr, tx)
}
Tmessage::XattrWalk(xattrwalk) => {
wire_format::WireFormat::encode(xattrwalk, tx)
}
Tmessage::XattrCreate(xattrcreate) => {
wire_format::WireFormat::encode(xattrcreate, tx)
}
Tmessage::Readdir(readdir) => {
wire_format::WireFormat::encode(readdir, tx)
}
Tmessage::Fsync(fsync) => {
wire_format::WireFormat::encode(fsync, tx)
}
Tmessage::Lock(lock) => wire_format::WireFormat::encode(lock, tx),
Tmessage::GetLock(getlock) => {
wire_format::WireFormat::encode(getlock, tx)
}
Tmessage::Link(link) => wire_format::WireFormat::encode(link, tx),
Tmessage::Mkdir(mkdir) => {
wire_format::WireFormat::encode(mkdir, tx)
}
Tmessage::RenameAt(renameat) => {
wire_format::WireFormat::encode(renameat, tx)
}
Tmessage::UnlinkAt(unlinkat) => {
wire_format::WireFormat::encode(unlinkat, tx)
}
Tmessage::Walk(walk) => wire_format::WireFormat::encode(walk, tx),
};
Ok(())
}
}
// A reference implementation written in go.
/*
package client // import "9fans.net/go/plan9/client"
import (
"fmt"
"io"
"sync"
"sync/atomic"
"9fans.net/go/plan9"
)
type Error string
func (e Error) Error() string { return string(e) }
type Conn struct {
// We wrap the underlying conn type so that
// there's a clear distinction between Close,
// which forces a close of the underlying rwc,
// and Release, which lets the Fids take control
// of when the conn is actually closed.
mu sync.Mutex
_c *conn
released bool
}
var errClosed = fmt.Errorf("connection has been closed")
// Close forces a close of the connection and all Fids derived
// from it.
func (c *Conn) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
if c._c == nil {
if c.released {
return fmt.Errorf("cannot close connection after it's been released")
}
return nil
}
rwc := c._c.rwc
c._c = nil
// TODO perhaps we shouldn't hold the mutex while closing?
return rwc.Close()
}
func (c *Conn) conn() (*conn, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c._c == nil {
return nil, errClosed
}
return c._c, nil
}
// Release marks the connection so that it will
// close automatically when the last Fid derived
// from it is closed.
//
// If there are no current Fids, it closes immediately.
// After calling Release, c.Attach, c.Auth and c.Close will return
// an error.
func (c *Conn) Release() error {
c.mu.Lock()
defer c.mu.Unlock()
if c._c == nil {
return nil
}
conn := c._c
c._c = nil
c.released = true
return conn.release()
}
type conn struct {
rwc io.ReadWriteCloser
err error
tagmap map[uint16]chan *plan9.Fcall
freetag map[uint16]bool
freefid map[uint32]bool
nexttag uint16
nextfid uint32
msize uint32
version string
w, x sync.Mutex
muxer bool
refCount int32 // atomic
}
func NewConn(rwc io.ReadWriteCloser) (*Conn, error) {
c := &conn{
rwc: rwc,
tagmap: make(map[uint16]chan *plan9.Fcall),
freetag: make(map[uint16]bool),
freefid: make(map[uint32]bool),
nexttag: 1,
nextfid: 1,
msize: 131072,
version: "9P2000",
refCount: 1,
}
// XXX raw messages, not c.rpc
tx := &plan9.Fcall{Type: plan9.Tversion, Tag: plan9.NOTAG, Msize: c.msize, Version: c.version}
err := c.write(tx)
if err != nil {
return nil, err
}
rx, err := c.read()
if err != nil {
return nil, err
}
if rx.Type != plan9.Rversion || rx.Tag != plan9.NOTAG {
return nil, plan9.ProtocolError(fmt.Sprintf("invalid type/tag in Tversion exchange: %v %v", rx.Type, rx.Tag))
}
if rx.Msize > c.msize {
return nil, plan9.ProtocolError(fmt.Sprintf("invalid msize %d in Rversion", rx.Msize))
}
c.msize = rx.Msize
if rx.Version != "9P2000" {
return nil, plan9.ProtocolError(fmt.Sprintf("invalid version %s in Rversion", rx.Version))
}
return &Conn{
_c: c,
}, nil
}
func (c *conn) newFid(fid uint32, qid plan9.Qid) *Fid {
c.acquire()
return &Fid{
_c: c,
fid: fid,
qid: qid,
}
}
func (c *conn) newfidnum() (uint32, error) {
c.x.Lock()
defer c.x.Unlock()
for fidnum, _ := range c.freefid {
delete(c.freefid, fidnum)
return fidnum, nil
}
fidnum := c.nextfid
if c.nextfid == plan9.NOFID {
return 0, plan9.ProtocolError("out of fids")
}
c.nextfid++
return fidnum, nil
}
func (c *conn) putfidnum(fid uint32) {
c.x.Lock()
defer c.x.Unlock()
c.freefid[fid] = true
}
func (c *conn) newtag(ch chan *plan9.Fcall) (uint16, error) {
c.x.Lock()
defer c.x.Unlock()
var tagnum uint16
for tagnum, _ = range c.freetag {
delete(c.freetag, tagnum)
goto found
}
tagnum = c.nexttag
if c.nexttag == plan9.NOTAG {
return 0, plan9.ProtocolError("out of tags")
}
c.nexttag++
found:
c.tagmap[tagnum] = ch
if !c.muxer {
c.muxer = true
ch <- &yourTurn
}
return tagnum, nil
}
func (c *conn) puttag(tag uint16) chan *plan9.Fcall {
c.x.Lock()
defer c.x.Unlock()
ch := c.tagmap[tag]
delete(c.tagmap, tag)
c.freetag[tag] = true
return ch
}
func (c *conn) mux(rx *plan9.Fcall) {
c.x.Lock()
defer c.x.Unlock()
ch := c.tagmap[rx.Tag]
delete(c.tagmap, rx.Tag)
c.freetag[rx.Tag] = true
c.muxer = false
for _, ch2 := range c.tagmap {
c.muxer = true
ch2 <- &yourTurn
break
}
ch <- rx
}
func (c *conn) read() (*plan9.Fcall, error) {
if err := c.getErr(); err != nil {
return nil, err
}
f, err := plan9.ReadFcall(c.rwc)
if err != nil {
c.setErr(err)
return nil, err
}
return f, nil
}
func (c *conn) write(f *plan9.Fcall) error {
if err := c.getErr(); err != nil {
return err
}
err := plan9.WriteFcall(c.rwc, f)
if err != nil {
c.setErr(err)
}
return err
}
var yourTurn plan9.Fcall
func (c *conn) rpc(tx *plan9.Fcall, clunkFid *Fid) (rx *plan9.Fcall, err error) {
ch := make(chan *plan9.Fcall, 1)
tx.Tag, err = c.newtag(ch)
if err != nil {
return nil, err
}
c.w.Lock()
err = c.write(tx)
// Mark the fid as clunked inside the write lock so that we're
// sure that we don't reuse it after the sending the message
// that will clunk it, even in the presence of concurrent method
// calls on Fid.
if clunkFid != nil {
// Closing the Fid might release the conn, which
// would close the underlying rwc connection,
// which would prevent us from being able to receive the
// reply, so make sure that doesn't happen until the end
// by acquiring a reference for the duration of the call.
c.acquire()
defer c.release()
if err := clunkFid.clunked(); err != nil {
// This can happen if two clunking operations
// (e.g. Close and Remove) are invoked concurrently
c.w.Unlock()
return nil, err
}
}
c.w.Unlock()
if err != nil {
return nil, err
}
for rx = range ch {
if rx != &yourTurn {
break
}
rx, err = c.read()
if err != nil {
break
}
c.mux(rx)
}
if rx == nil {
return nil, c.getErr()
}
if rx.Type == plan9.Rerror {
return nil, Error(rx.Ename)
}
if rx.Type != tx.Type+1 {
return nil, plan9.ProtocolError("packet type mismatch")
}
return rx, nil
}
func (c *conn) acquire() {
atomic.AddInt32(&c.refCount, 1)
}
func (c *conn) release() error {
if atomic.AddInt32(&c.refCount, -1) != 0 {
return nil
}
err := c.rwc.Close()
c.setErr(errClosed)
return err
}
func (c *conn) getErr() error {
c.x.Lock()
defer c.x.Unlock()
return c.err
}
func (c *conn) setErr(err error) {
c.x.Lock()
defer c.x.Unlock()
c.err = err
}
*/
mod client_tests;

View file

@ -1,218 +0,0 @@
use std::{
collections::btree_map,
io::{self},
marker::PhantomData,
pin::Pin,
};
use futures_util::Future;
use p9::{Data, Rframe, WireFormat};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use crate::async_wire_format::{AsyncWireFormat, AsyncWireFormatExt};
use super::ninep_2000_l::NineP200L;
#[async_trait::async_trait]
trait Stat {
async fn stat(&mut self) -> p9::Rgetattr;
}
#[async_trait::async_trait]
trait File: AsyncRead + AsyncWrite + Stat {
async fn read(&mut self, msg: &p9::Tread) -> io::Result<p9::Rread>;
async fn write(&mut self, msg: &p9::Twrite) -> io::Result<p9::Rwrite>;
async fn flush(&mut self, _msg: &p9::Tflush) -> io::Result<()>;
async fn stat(&mut self, msg: &p9::Tgetattr) -> io::Result<p9::Rgetattr>;
}
#[async_trait::async_trait]
trait FileExt: File
where
Self: Sized + Send + Sync + Unpin,
{
async fn read(&mut self, msg: &p9::Tread) -> io::Result<p9::Rread> {
let mut buf = vec![0; msg.count as usize];
let _n = self.read_exact(buf.as_mut_slice()).await?;
Ok(p9::Rread { data: Data(buf) })
}
async fn write(&mut self, msg: &p9::Twrite) -> io::Result<p9::Rwrite> {
self.write_all(&msg.data.0).await?;
Ok(p9::Rwrite {
count: msg.data.0.len() as u32,
})
}
async fn flush(&mut self, _msg: &p9::Tflush) -> io::Result<()> {
AsyncWriteExt::flush(&mut self).await
}
async fn stat(&mut self, _msg: &p9::Tgetattr) -> io::Result<p9::Rgetattr> {
Ok(p9::Rgetattr {
valid: 0,
qid: p9::Qid {
ty: 0,
version: 0,
path: 0,
},
mode: 0,
uid: 0,
gid: 0,
nlink: 0,
rdev: 0,
size: 0,
blksize: 0,
blocks: 0,
atime_sec: 0,
atime_nsec: 0,
mtime_sec: 0,
mtime_nsec: 0,
ctime_sec: 0,
ctime_nsec: 0,
btime_sec: 0,
btime_nsec: 0,
gen: 0,
data_version: 0,
})
}
}
#[async_trait::async_trait]
trait Dir {
async fn open(&mut self, msg: &p9::Tlopen) -> io::Result<p9::Rlopen>;
async fn create(&mut self, msg: &p9::Tlcreate) -> io::Result<p9::Rlcreate>;
async fn remove(&mut self, msg: &p9::Tremove) -> io::Result<()>;
async fn stat(&mut self, msg: &p9::Tgetattr) -> io::Result<p9::Rgetattr>;
}
#[async_trait::async_trait]
trait DirExt: Dir
where
Self: Sized + Send + Sync + Unpin,
{
async fn open(&mut self, _msg: &p9::Tlopen) -> io::Result<p9::Rlopen> {
Ok(p9::Rlopen {
qid: p9::Qid {
ty: 0,
version: 0,
path: 0,
},
iounit: 0,
})
}
async fn create(
&mut self,
_msg: &p9::Tlcreate,
) -> io::Result<p9::Rlcreate> {
Ok(p9::Rlcreate {
qid: p9::Qid {
ty: 0,
version: 0,
path: 0,
},
iounit: 0,
})
}
async fn remove(&mut self, _msg: &p9::Tremove) -> io::Result<()> {
Ok(())
}
async fn stat(&mut self, _msg: &p9::Tgetattr) -> io::Result<p9::Rgetattr> {
Ok(p9::Rgetattr {
valid: 0,
qid: p9::Qid {
ty: 0,
version: 0,
path: 0,
},
mode: 0,
uid: 0,
gid: 0,
nlink: 0,
rdev: 0,
size: 0,
blksize: 0,
blocks: 0,
atime_sec: 0,
atime_nsec: 0,
mtime_sec: 0,
mtime_nsec: 0,
ctime_sec: 0,
ctime_nsec: 0,
btime_sec: 0,
btime_nsec: 0,
gen: 0,
data_version: 0,
})
}
}
enum Node<F: File, D: Dir> {
File(F),
Dir(D),
Empty
}
#[derive(Eq, Clone)]
struct Fid {
inner: u32,
_phantom: PhantomData<()>,
}
impl PartialEq for Fid {
fn eq(&self, other: &Self) -> bool {
self.inner == other.inner
}
}
impl std::hash::Hash for Fid {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.inner.hash(state);
}
}
impl std::cmp::PartialOrd for Fid {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.inner.partial_cmp(&other.inner)
}
}
impl std::cmp::Ord for Fid {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.inner.cmp(&other.inner)
}
}
struct FileSystem<F: File, D: Dir> {
fids: btree_map::BTreeMap<Fid, String>,
nodes: btree_map::BTreeMap<Fid, Node<F, D>>,
}
impl <F: File, D: Dir> FileSystem<F, D> {
fn new() -> Self {
Self {
fids: btree_map::BTreeMap::new(),
nodes: btree_map::BTreeMap::new(),
}
}
}
impl<F: File, D: Dir> FileSystem<F, D> {
async fn attach(&mut self, msg: &p9::Tattach) -> io::Result<p9::Rattach> {
let fid = Fid {
inner: msg.fid,
_phantom: PhantomData,
};
self.fids.insert(fid.clone(), msg.uname.clone());
self.nodes.insert(fid, Node::Empty);
Ok(p9::Rattach {
qid: p9::Qid {
ty: 0,
version: 0,
path: 0,
},
})
}
}

View file

@ -1,8 +0,0 @@
pub mod async_wire_format;
pub mod log;
pub mod server;
pub mod service;
pub use p9::protocol;

View file

@ -1,86 +0,0 @@
use lazy_static::lazy_static;
use parking_lot::Once;
use slog::{slog_o, Drain, Logger};
use slog_scope::GlobalLoggerGuard;
use std::{io::Write, ops::Add, path::PathBuf};
use termcolor::{BufferWriter, Color, ColorChoice, ColorSpec, WriteColor};
// get module colour hashes the module name
// and attempts to return a uniqe color as far as ansi colors go
fn get_module_colour(module: &str) -> Color {
// crc16 is a good hash for this
let hash = crc16::State::<crc16::XMODEM>::calculate(module.as_bytes());
let hash = hash.add(5);
let color = match hash % 6 {
0 => Color::Red,
1 => Color::Green,
2 => Color::Yellow,
3 => Color::Blue,
4 => Color::Magenta,
5 => Color::Cyan,
_ => Color::White,
};
color
}
pub fn setup_logging() -> Logger {
let x = drain();
slog::Logger::root(x, slog_o!())
}
pub fn drain() -> slog::Fuse<slog_term::FullFormat<slog_term::PlainSyncDecorator<std::io::Stdout>>> {
let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());
let ff = slog_term::FullFormat::new(plain);
let x = ff
.use_custom_header_print(|f, t, r, x| {
// print format is: dev.branch/{module} {level} {msg}
// module should be cleaned by :: -> /
// level should be colored use termcolor
let module = r.module().replace("::", "/");
let level = match r.level() {
slog::Level::Critical => termcolor::Color::Red,
slog::Level::Error => termcolor::Color::Red,
slog::Level::Warning => termcolor::Color::Yellow,
slog::Level::Info => termcolor::Color::Green,
slog::Level::Debug => termcolor::Color::Blue,
slog::Level::Trace => termcolor::Color::Cyan,
};
let cargo_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"));
// drop last component
let mut location_buffer = PathBuf::from_iter(
cargo_path
.components()
.take(cargo_path.components().count() - 1),
);
location_buffer.push(r.file());
let loc = location_buffer.to_str().unwrap();
let bufwtr = BufferWriter::stderr(ColorChoice::Always);
let mut buffer = bufwtr.buffer();
let module_color = get_module_colour(&module);
buffer.set_color(ColorSpec::new().set_fg(Some(module_color)))?;
let _ = write!(buffer, "dev.branch/{} ", module,);
buffer.reset()?;
buffer.set_color(
ColorSpec::new()
.set_dimmed(true)
.set_underline(true)
.set_fg(Some(Color::White)),
)?;
let _ = write!(buffer, "{}:{}", loc.to_string(), r.location().line);
buffer.reset()?;
buffer.set_color(
ColorSpec::new().set_fg(Some(level)).set_intense(true),
)?;
let _ = write!(buffer, " {}", r.level());
buffer.reset()?;
let _ = write!(buffer, " {}", r.msg());
let _ = bufwtr.print(&buffer);
std::result::Result::Ok(true)
})
.build()
.fuse();
x
}

View file

@ -1,58 +0,0 @@
// hugefs is a very FAST and efficient file system designed to scale to very large file systems.
// The current schema uses the build the dumbest version and see where it fails, apply learnings and iterate.
//
// Storage (we are only concerned with storage, transport is already very fast)
// -------
// The dumbest and probably the slowest path to store a data is this,
// storefile(path, data):
// for i, chunk in fastCDC(data).await {
// storechunk(path, i chunk)
// }
//
// storechunk(path, n, chunk):
// hash = blake3(chunk)
// s3.put(hash, chunk)
// db.exec(
// if !path in db:
// db[path] = []
// db[path].insert_at(n, hash)
// )
//
// Something, roughly like this, and the corresponding reads.
use p9::WireFormat;
use crate::P9WireFormat;
#[derive(Debug, P9WireFormat)]
pub struct Id {
data: p9::Data,
}
impl PartialEq for Id {
fn eq(&self, other: &Self) -> bool {
self.data == other.data
}
}
// tests
#[cfg(test)]
mod tests {
use clap::builder::ValueParserFactory;
use super::*;
use std::io::Cursor;
#[test]
fn test_id() {
// sha256("hello")
let id = Id {
data: p9::Data(vec![
0x9b, 0x71, 0x7c, 0x3c, 0x7b, 0x8b, 0x7e, 0x5f, 0x7f, 0x8b,
]),
};
}
}

View file

@ -1,8 +0,0 @@
pub mod ninep_2000_l;
pub mod proxy;
pub mod quic_server;
pub mod ufs;
pub mod x509_fs;
pub mod hugefs;
mod server_tests;

View file

@ -1,182 +0,0 @@
use std::io;
use p9::*;
/// 9p
#[async_trait::async_trait]
pub trait NineP200L: Send + Sync {
/// The version message is the first message sent on a connection. It is used to negotiate the
/// 9P protocol version and maximum message size.
async fn version(
self: &mut Self,
tag: u16,
version: &Tversion,
) -> io::Result<Rversion>;
/// The auth message is used to authenticate a user to the server. It is sent after the version
/// message and before any other messages.
/// The auth message is optional and may be ignored by the server.
async fn auth(self: &mut Self, tag: u16, auth: &Tauth)
-> io::Result<Rauth>;
/// The flush message is used to flush pending I/O requests.
async fn flush(self: &mut Self, tag: u16, flush: &Tflush)
-> io::Result<()>;
/// The walk message is used to traverse the file system hierarchy. It is sent by the client and
/// responded to by the server.
async fn walk(self: &mut Self, tag: u16, walk: &Twalk)
-> io::Result<Rwalk>;
/// The read message is used to read data from a file.
async fn read(self: &mut Self, tag: u16, read: &Tread)
-> io::Result<Rread>;
/// The write message is used to write data to a file.
async fn write(
self: &mut Self,
tag: u16,
write: &Twrite,
) -> io::Result<Rwrite>;
/// The clunk message is used to release a fid.
async fn clunk(self: &mut Self, tag: u16, clunk: &Tclunk)
-> io::Result<()>;
/// The remove message is used to remove a file.
async fn remove(
self: &mut Self,
tag: u16,
remove: &Tremove,
) -> io::Result<()>;
/// The attach message is used to associate a fid with a file.
async fn attach(
self: &mut Self,
tag: u16,
attach: &Tattach,
) -> io::Result<Rattach>;
/// The statfs message is used to retrieve file system information.
async fn statfs(
self: &mut Self,
tag: u16,
statfs: &Tstatfs,
) -> io::Result<Rstatfs>;
/// The lopen message is used to open a file.
async fn lopen(
self: &mut Self,
tag: u16,
lopen: &Tlopen,
) -> io::Result<Rlopen>;
/// The lcreate message is used to create a file.
async fn lcreate(
self: &mut Self,
tag: u16,
lcreate: &Tlcreate,
) -> io::Result<Rlcreate>;
/// The symlink message is used to create a symbolic link.
async fn symlink(
self: &mut Self,
tag: u16,
symlink: &Tsymlink,
) -> io::Result<Rsymlink>;
/// The mknod message is used to create a device file.
async fn mknod(
self: &mut Self,
tag: u16,
mknod: &Tmknod,
) -> io::Result<Rmknod>;
/// The rename message is used to rename a file.
async fn rename(
self: &mut Self,
tag: u16,
rename: &Trename,
) -> io::Result<()>;
/// The readlink message is used to read the target of a symbolic link.
async fn readlink(
self: &mut Self,
tag: u16,
readlink: &Treadlink,
) -> io::Result<Rreadlink>;
/// The getattr message is used to retrieve file attributes.
async fn get_attr(
self: &mut Self,
tag: u16,
get_attr: &Tgetattr,
) -> io::Result<Rgetattr>;
/// The setattr message is used to set file attributes.
async fn set_attr(
self: &mut Self,
tag: u16,
set_attr: &Tsetattr,
) -> io::Result<()>;
/// The xattrwalk message is used to traverse extended attributes.
async fn xattr_walk(
self: &mut Self,
tag: u16,
xattr_walk: &Txattrwalk,
) -> io::Result<Rxattrwalk>;
/// The xattrcreate message is used to create an extended attribute.
async fn xattr_create(
self: &mut Self,
tag: u16,
xattr_create: &Txattrcreate,
) -> io::Result<()>;
/// The readdir message is used to read a directory.
async fn readdir(
self: &mut Self,
tag: u16,
readdir: &Treaddir,
) -> io::Result<Rreaddir>;
/// The fsync message is used to synchronize a file's data and metadata.
async fn fsync(self: &mut Self, tag: u16, fsync: &Tfsync)
-> io::Result<()>;
/// The lock message is used to lock a file.
async fn lock(self: &mut Self, tag: u16, lock: &Tlock)
-> io::Result<Rlock>;
/// The getlock message is used to retrieve a file's locks.
async fn get_lock(
self: &mut Self,
tag: u16,
get_lock: &Tgetlock,
) -> io::Result<Rgetlock>;
/// The link message is used to create a hard link.
async fn link(self: &mut Self, tag: u16, link: &Tlink) -> io::Result<()>;
/// The mkdir message is used to create a directory.
async fn mkdir(
self: &mut Self,
tag: u16,
mkdir: &Tmkdir,
) -> io::Result<Rmkdir>;
/// The renameat message is used to rename a file.
async fn rename_at(
self: &mut Self,
tag: u16,
rename_at: &Trenameat,
) -> io::Result<()>;
/// The unlinkat message is used to remove a file.
async fn unlink_at(
self: &mut Self,
tag: u16,
unlink_at: &Tunlinkat,
) -> io::Result<()>;
}

View file

@ -1,158 +0,0 @@
use crate::async_wire_format::{AsyncWireFormat, AsyncWireFormatExt};
use anyhow::Ok;
use futures_util::AsyncReadExt;
use p9::{Rframe, Tframe, WireFormat};
use s2n_quic::client::{Client, Connect};
use s2n_quic::provider::tls;
use serde::de;
use slog_scope::{debug, error};
use std::io::Write;
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
use termcolor::{BufferWriter, Color, ColorChoice, ColorSpec, WriteColor};
use tokio::net::UnixStream;
use tokio::sync::Mutex;
use crate::{log, ConvertWireFormat};
#[derive(Debug, Clone)]
pub struct DialQuic {
host: String,
port: u16,
client_cert: Box<Path>,
key: Box<Path>,
ca_cert: Box<Path>,
hostname: String,
}
impl DialQuic {
pub fn new(
host: String,
port: u16,
cert: Box<Path>,
key: Box<Path>,
ca_cert: Box<Path>,
hostname: String,
) -> Self {
Self {
host,
port,
client_cert: cert,
key,
ca_cert,
hostname,
}
}
}
impl DialQuic {
async fn dial(self) -> anyhow::Result<s2n_quic::Connection> {
let ca_cert = self.ca_cert.to_str().unwrap();
let client_cert = self.client_cert.to_str().unwrap();
let client_key = self.key.to_str().unwrap();
let tls = tls::default::Client::builder()
.with_certificate(Path::new(ca_cert))?
.with_client_identity(
Path::new(client_cert),
Path::new(client_key),
)?
.build()?;
let client = Client::builder()
.with_tls(tls)?
.with_io("0.0.0.0:0")?
.start()?;
let host_port = format!("{}:{}", self.host, self.port);
let addr: SocketAddr = host_port.parse()?;
let connect = Connect::new(addr).with_server_name(&*self.hostname);
let mut connection = client.connect(connect).await?;
// ensure the connection doesn't time out with inactivity
connection.keep_alive(true)?;
Ok(connection)
}
}
pub struct Proxy {
dial: DialQuic,
listen: Box<Path>,
}
impl Proxy {
pub fn new(dial: DialQuic, listen: Box<Path>) -> Self {
Self { dial, listen }
}
}
impl Proxy {
pub async fn run(&self) {
debug!("Listening on {:?}", self.listen);
let listener = tokio::net::UnixListener::bind(&self.listen).unwrap();
while let std::result::Result::Ok((mut down_stream, _)) =
listener.accept().await
{
debug!("Accepted connection from {:?}", down_stream.peer_addr());
async move {
let down_stream = down_stream;
let dial = self.dial.clone();
debug!("Dialing {:?}", dial);
let mut dial = dial.clone().dial().await.unwrap();
debug!("Connected to {:?}", dial.remote_addr());
let up_stream = dial.open_bidirectional_stream().await.unwrap();
tokio::task::spawn(async move {
up_stream.connection().ping().unwrap();
let (rx, mut tx) = up_stream.split();
let (read, mut write) = down_stream.into_split();
let mut upstream_reader = tokio::io::BufReader::new(rx);
//let mut upstream_writer = tokio::io::BufWriter::new(tx);
// let mut downstream_writer =
// tokio::io::BufWriter::new(write);
let mut downstream_reader = tokio::io::BufReader::new(read);
loop {
// read and send to up_stream
{
debug!("Reading from down_stream");
let tframe =
Tframe::decode_async(&mut downstream_reader)
.await;
if let Err(e) = tframe {
// if error is eof, break
if e.kind() == std::io::ErrorKind::UnexpectedEof
{
break;
} else {
error!(
"Error decoding from down_stream: {:?}",
e
);
break;
}
} else if let std::io::Result::Ok(tframe) = tframe {
debug!("Sending to up_stream {:?}", tframe);
let _ =
tframe.encode_async(&mut tx).await.unwrap();
}
}
// write and send to down_stream
{
debug!("Reading from up_stream");
let rframe =
Rframe::decode_async(&mut upstream_reader)
.await
.unwrap();
debug!("Sending to down_stream");
rframe.encode_async(&mut write).await.unwrap();
}
}
});
}
.await;
}
}
}

View file

@ -1,113 +0,0 @@
use std::{path::Path, sync::Arc};
use bytes::Bytes;
use futures_util::AsyncReadExt;
use p9::{Tframe, WireFormat};
use s2n_quic::{provider::tls, Server};
use serde::de;
use slog_scope::{debug, error};
use tokio::sync::Mutex;
use tower::Service;
use crate::{
async_wire_format::AsyncWireFormatExt,
log::{self, setup_logging},
JetStreamService, Message, NinePService, NinePServiceImpl, Radar,
};
pub struct QuicServer<
Req: Message,
Resp: Message,
S: JetStreamService<Req, Resp>,
> {
svc: S,
_ghost: std::marker::PhantomData<(Req, Resp)>,
}
impl<Req: Message, Resp: Message, S: JetStreamService<Req, Resp>>
QuicServer<Req, Resp, S>
{
pub fn new(svc: S) -> Self {
Self {
svc,
_ghost: std::marker::PhantomData,
}
}
}
impl<
Req: Message,
Resp: Message,
T: JetStreamService<Req, Resp> + Clone + 'static,
> QuicServer<Req, Resp, T>
{
pub async fn serve(mut self, mut server: Server) -> anyhow::Result<()> {
debug!("Server started");
while let Some(mut connection) = server.accept().await {
debug!("Connection opened from {:?}", connection.remote_addr());
let svc = self.svc.clone();
// spawn a new task for the connection
tokio::spawn(async move {
debug!("Connection opened from {:?}", connection.remote_addr());
let svc = svc.clone();
while let Ok(Some(stream)) =
connection.accept_bidirectional_stream().await
{
// spawn a new task for the stream
let svc = svc.clone();
tokio::spawn(async move {
debug!("Stream opened");
// echo any data back to the stream
let (read, mut write) = stream.split();
// let mut downstream_writer =
// tokio::io::BufWriter::new(write);
let mut downstream_reader =
tokio::io::BufReader::new(read);
let mut svc = svc.clone();
loop {
// read and send to up_stream
{
debug!("Reading from down_stream");
let tframe =
Req::decode_async(&mut downstream_reader)
.await;
//debug!("got tframe: {:?}", tframe);
if let Err(e) = tframe {
// if error is eof, break
if e.kind()
== std::io::ErrorKind::UnexpectedEof
{
break;
} else {
error!(
"Error decoding from down_stream: {:?}",
e
);
break;
}
} else if let std::io::Result::Ok(tframe) =
tframe
{
debug!("Sending to up_stream");
let rframe =
svc.clone().call(tframe).await.unwrap();
//debug!("got rframe: {:?}", rframe);
debug!("writing to down_stream");
rframe
.encode_async(&mut write)
.await
.unwrap();
write.flush().await.unwrap();
}
}
}
});
}
});
}
Ok(())
}
}

View file

@ -1,237 +0,0 @@
#[cfg(test)]
mod tests {
use std::path::{self, Path};
use std::sync::Arc;
use p9::{Rframe, Tversion};
use p9::{Tframe, Tmessage, WireFormat};
use s2n_quic::provider::tls;
use s2n_quic::Server;
use serde::de;
use slog_scope::debug;
use tokio::io::AsyncWriteExt;
use tokio::runtime;
use tokio::sync::Barrier;
use tokio_util::compat::TokioAsyncReadCompatExt;
use tokio_util::compat::TokioAsyncWriteCompatExt;
use crate::async_wire_format::AsyncWireFormatExt;
use crate::log::{drain, setup_logging};
use crate::ninepecho::{self, EchoService};
use crate::{
server::{
proxy::{self, DialQuic, Proxy},
quic_server::QuicServer,
},
NinePServiceImpl, Radar,
};
use super::*;
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_quic_server_unix_socket_proxy() {
let _guard = slog_scope::set_global_logger(setup_logging());
let _guard = slog_envlogger::new(drain());
let (tx, mut rx) = tokio::io::duplex(1024);
pub static CA_CERT_PEM: &str =
concat!(env!("CARGO_MANIFEST_DIR"), "/certs/ca-cert.pem");
pub static SERVER_CERT_PEM: &str =
concat!(env!("CARGO_MANIFEST_DIR"), "/certs/server-cert.pem");
pub static SERVER_KEY_PEM: &str =
concat!(env!("CARGO_MANIFEST_DIR"), "/certs/server-key.pem");
pub static CLIENT_CERT_PEM: &str =
concat!(env!("CARGO_MANIFEST_DIR"), "/certs/client-cert.pem");
pub static CLIENT_KEY_PEM: &str =
concat!(env!("CARGO_MANIFEST_DIR"), "/certs/client-key.pem");
let tls = tls::default::Server::builder()
.with_trusted_certificate(Path::new(CA_CERT_PEM))
.unwrap()
.with_certificate(
Path::new(SERVER_CERT_PEM),
Path::new(SERVER_KEY_PEM),
)
.unwrap()
.with_client_authentication()
.unwrap()
.build()
.unwrap();
let barrier = Arc::new(Barrier::new(3)).clone();
let c = barrier.clone();
let srv_handle = tokio::spawn(async move {
let mut server = Server::builder()
.with_tls(tls)
.unwrap()
.with_io("127.0.0.1:4433")
.unwrap()
.start()
.unwrap();
let qsrv: QuicServer<Tframe, Rframe, EchoService> =
QuicServer::new(ninepecho::EchoService);
debug!("Server started, waiting for barrier");
c.wait().await;
let _ = qsrv.serve(server).await;
});
let cert = path::PathBuf::from(CLIENT_CERT_PEM).into_boxed_path();
let key = path::PathBuf::from(CLIENT_KEY_PEM).into_boxed_path();
let ca_cert = path::PathBuf::from(CA_CERT_PEM).into_boxed_path();
let temp_dir = tmpdir::TmpDir::new("q9p").await.unwrap();
let mut listen = temp_dir.to_path_buf();
listen.push("q9p.sock");
let listen = listen.into_boxed_path();
let l = listen.clone();
let c = barrier.clone();
let prxy_handle = tokio::spawn(async move {
debug!("Proxy started, waiting for barrier");
c.wait().await;
let prxy = Proxy::new(
DialQuic::new(
"127.0.0.1".to_string(),
4433,
cert,
key,
ca_cert,
"localhost".to_string(),
),
l.clone(),
);
let _ = prxy.run().await;
});
let c = barrier.clone();
let l = listen.clone();
let client_handle = tokio::spawn(async move {
c.clone().wait().await;
// sleep for 5 milliseconds to give the server time to start
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
debug!("Connecting to {:?}", listen);
let (mut read, mut write) = tokio::net::UnixStream::connect(l)
.await
.unwrap()
.into_split();
// loop 100 times
for _ in 0..100 {
let test = Tframe {
tag: 0,
msg: Ok(Tmessage::Version(Tversion {
msize: 8192,
version: "9P2000.L".to_string(),
})),
};
debug!("Sending tframe: {:?}", test);
// ping
test.encode_async(&mut write).await.unwrap();
write.flush().await.unwrap();
debug!("Reading rframe");
read.readable().await.unwrap();
// pong
let resp = Rframe::decode_async(&mut read).await.unwrap();
assert_eq!(resp.tag, 0);
}
});
let timeout = std::time::Duration::from_secs(10);
let timeout = tokio::time::sleep(timeout);
tokio::select! {
_ = timeout => {
panic!("Timeout");
}
_ = srv_handle => {
panic!("Quic server failed");
}
_ = prxy_handle => {
panic!("Proxy failed");
}
_ = client_handle => {
return;
}
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_quic_server_unix_socket_proxy_service() {
let _guard = slog_scope::set_global_logger(setup_logging());
let _guard = slog_envlogger::new(drain());
let (tx, mut rx) = tokio::io::duplex(1024);
pub static CA_CERT_PEM: &str =
concat!(env!("CARGO_MANIFEST_DIR"), "/certs/ca-cert.pem");
pub static SERVER_CERT_PEM: &str =
concat!(env!("CARGO_MANIFEST_DIR"), "/certs/server-cert.pem");
pub static SERVER_KEY_PEM: &str =
concat!(env!("CARGO_MANIFEST_DIR"), "/certs/server-key.pem");
pub static CLIENT_CERT_PEM: &str =
concat!(env!("CARGO_MANIFEST_DIR"), "/certs/client-cert.pem");
pub static CLIENT_KEY_PEM: &str =
concat!(env!("CARGO_MANIFEST_DIR"), "/certs/client-key.pem");
let tls = tls::default::Server::builder()
.with_trusted_certificate(Path::new(CA_CERT_PEM))
.unwrap()
.with_certificate(
Path::new(SERVER_CERT_PEM),
Path::new(SERVER_KEY_PEM),
)
.unwrap()
.with_client_authentication()
.unwrap()
.build()
.unwrap();
let barrier = Arc::new(Barrier::new(3)).clone();
let c = barrier.clone();
let srv_handle = tokio::spawn(async move {
});
let cert = path::PathBuf::from(CLIENT_CERT_PEM).into_boxed_path();
let key = path::PathBuf::from(CLIENT_KEY_PEM).into_boxed_path();
let ca_cert = path::PathBuf::from(CA_CERT_PEM).into_boxed_path();
let temp_dir = tmpdir::TmpDir::new("q9p").await.unwrap();
let mut listen = temp_dir.to_path_buf();
listen.push("q9p.sock");
let listen = listen.into_boxed_path();
let l = listen.clone();
let c = barrier.clone();
let prxy_handle = tokio::spawn(async move {
debug!("Proxy started, waiting for barrier");
c.wait().await;
let prxy = Proxy::new(
DialQuic::new(
"127.0.0.1".to_string(),
4433,
cert,
key,
ca_cert,
"localhost".to_string(),
),
l.clone(),
);
let _ = prxy.run().await;
});
let c = barrier.clone();
let l = listen.clone();
let timeout = std::time::Duration::from_secs(10);
let timeout = tokio::time::sleep(timeout);
tokio::select! {
_ = timeout => {
panic!("Timeout");
}
_ = srv_handle => {
panic!("Quic server failed");
}
_ = prxy_handle => {
panic!("Proxy failed");
}
}
}
}

View file

@ -1,94 +0,0 @@
use std::{
cell::{Cell, RefCell},
collections::{btree_map, BTreeMap},
path::{Path, PathBuf},
rc::Rc,
sync::Arc,
};
use p9::{server::Server, Rframe, Rmessage, Tframe, Tmessage};
use tokio::{sync::oneshot::*, task::JoinHandle};
use crate::{NinePService, Message, JetStreamService};
pub struct Handle {
tframe: Tframe,
reply_to: tokio::sync::oneshot::Sender<Rframe>,
}
pub struct Ufs {
sender: tokio::sync::mpsc::UnboundedSender<Handle>,
processor: tokio::sync::mpsc::UnboundedReceiver<Handle>,
server: Server,
}
impl Ufs {
pub fn new(path: PathBuf) -> Self {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Handle>();
Self {
sender: tx,
processor: rx,
server: Server::new(
path,
btree_map::BTreeMap::new(),
btree_map::BTreeMap::new(),
)
.unwrap(),
}
}
pub fn get_handler(&self) -> Handler {
Handler {
tx: self.sender.clone(),
}
}
}
impl Ufs {
pub async fn run(&mut self) -> anyhow::Result<()> {
while let Some(handle) = self.processor.recv().await {
let tframe = handle.tframe;
let reply_to = handle.reply_to;
let rframe = self
.server
.handle(&tframe)
.await
.unwrap();
reply_to.send(rframe).unwrap();
}
Ok(())
}
}
#[derive(Clone)]
pub struct Handler {
tx: tokio::sync::mpsc::UnboundedSender<Handle>,
}
impl Message for Tframe {}
impl Message for Rframe {}
impl JetStreamService<Tframe, Rframe> for Handler {
fn call(
&mut self,
req: p9::Tframe,
) -> std::pin::Pin<
Box<
dyn futures::prelude::Future<
Output = Result<
p9::Rframe,
Box<dyn std::error::Error + Send + Sync>,
>,
> + Send,
>,
> {
let (reply, result) = tokio::sync::oneshot::channel::<Rframe>();
self.tx
.send(Handle {
tframe: req,
reply_to: reply,
})
.unwrap();
Box::pin(async { result.await.map_err(|e| e.into()) })
}
}

View file

@ -1,629 +0,0 @@
use std::io;
use p9::messages::*;
use crate::server::ninep_2000_l::NineP200L;
struct X509Fs;
impl NineP200L for X509Fs {
/// The version message is the first message sent on a connection. It is used to negotiate the
/// 9P protocol version and maximum message size.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn version<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
version: &'life1 Tversion,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<Rversion>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The auth message is used to authenticate a user to the server. It is sent after the version
/// message and before any other messages.
/// The auth message is optional and may be ignored by the server.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn auth<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
auth: &'life1 Tauth,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<Rauth>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The flush message is used to flush pending I/O requests.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn flush<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
flush: &'life1 Tflush,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<()>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The walk message is used to traverse the file system hierarchy. It is sent by the client and
/// responded to by the server.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn walk<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
walk: &'life1 Twalk,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<Rwalk>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The read message is used to read data from a file.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn read<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
read: &'life1 Tread,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<Rread>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The write message is used to write data to a file.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn write<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
write: &'life1 Twrite,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<Rwrite>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The clunk message is used to release a fid.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn clunk<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
clunk: &'life1 Tclunk,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<()>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The remove message is used to remove a file.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn remove<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
remove: &'life1 Tremove,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<()>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The attach message is used to associate a fid with a file.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn attach<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
attach: &'life1 Tattach,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<Rattach>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The statfs message is used to retrieve file system information.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn statfs<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
statfs: &'life1 Tstatfs,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<Rstatfs>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The lopen message is used to open a file.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn lopen<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
lopen: &'life1 Tlopen,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<Rlopen>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The lcreate message is used to create a file.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn lcreate<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
lcreate: &'life1 Tlcreate,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<Rlcreate>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The symlink message is used to create a symbolic link.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn symlink<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
symlink: &'life1 Tsymlink,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<Rsymlink>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The mknod message is used to create a device file.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn mknod<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
mknod: &'life1 Tmknod,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<Rmknod>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The rename message is used to rename a file.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn rename<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
rename: &'life1 Trename,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<()>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The readlink message is used to read the target of a symbolic link.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn readlink<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
readlink: &'life1 Treadlink,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<Rreadlink>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The getattr message is used to retrieve file attributes.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn get_attr<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
get_attr: &'life1 Tgetattr,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<Rgetattr>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The setattr message is used to set file attributes.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn set_attr<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
set_attr: &'life1 Tsetattr,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<()>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The xattrwalk message is used to traverse extended attributes.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn xattr_walk<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
xattr_walk: &'life1 Txattrwalk,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<Rxattrwalk>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The xattrcreate message is used to create an extended attribute.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn xattr_create<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
xattr_create: &'life1 Txattrcreate,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<()>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The readdir message is used to read a directory.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn readdir<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
readdir: &'life1 Treaddir,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<Rreaddir>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The fsync message is used to synchronize a file\'s data and metadata.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn fsync<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
fsync: &'life1 Tfsync,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<()>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The lock message is used to lock a file.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn lock<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
lock: &'life1 Tlock,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<Rlock>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The getlock message is used to retrieve a file\'s locks.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn get_lock<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
get_lock: &'life1 Tgetlock,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<Rgetlock>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The link message is used to create a hard link.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn link<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
link: &'life1 Tlink,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<()>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The mkdir message is used to create a directory.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn mkdir<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
mkdir: &'life1 Tmkdir,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<Rmkdir>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The renameat message is used to rename a file.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn rename_at<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
rename_at: &'life1 Trenameat,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<()>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
/// The unlinkat message is used to remove a file.
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn unlink_at<'life0, 'life1, 'async_trait>(
self: &'life0 mut Self,
tag: u16,
unlink_at: &'life1 Tunlinkat,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = io::Result<()>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
}

View file

@ -1,183 +0,0 @@
use std::{
error::Error,
io::{Read, Write},
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use async_wire_format::AsyncWireFormatExt;
use bytes::{BufMut, Bytes, BytesMut};
use futures::prelude::*;
use p9::{Rframe, Rmessage, Tframe, Tmessage, WireFormat};
pub use p9_wire_format_derive::P9WireFormat;
use tower::Service;
pub trait Message: WireFormat + Send + Sync {}
/// A trait for implementing a 9P service.
/// This trait is implemented for types that can handle 9P requests.
pub trait JetStreamService<Req: Message, Resp: Message>:
Send + Sync + Sized
{
fn call(
&mut self,
req: Req,
) -> Pin<
Box<
dyn Future<Output = Result<Resp, Box<dyn Error + Send + Sync>>>
+ Send,
>,
>;
}
/// A trait for implementing a 9P service.
/// This trait is implemented for types that can handle 9P requests.
pub trait NinePService:
JetStreamService<Tframe, Rframe> + Send + Sync + Clone + Clone
{
}
/// A service that implements the 9P protocol.
#[derive(Debug, Clone, Copy)]
pub struct NinePServiceImpl<S: NinePService> {
inner: S,
}
impl<S: NinePService> NinePServiceImpl<S> {
pub fn new(inner: S) -> Self {
NinePServiceImpl { inner }
}
}
impl<S: NinePService> JetStreamService<Tframe, Rframe> for NinePServiceImpl<S> {
fn call(
&mut self,
req: Tframe,
) -> Pin<
Box<
dyn Future<Output = Result<Rframe, Box<dyn Error + Send + Sync>>>
+ Send,
>,
> {
self.inner.call(req)
}
}
/// A static 9p service that always returns a version message.
#[derive(Debug, Clone, Copy)]
pub struct Radar;
#[derive(Debug, Clone, P9WireFormat)]
struct Ping(u8);
impl Message for Ping {}
#[derive(Debug, Clone, P9WireFormat)]
struct Pong(u8);
impl Message for Pong {}
impl JetStreamService<Ping, Pong> for Radar {
fn call(
&mut self,
req: Ping,
) -> Pin<
Box<
dyn Future<Output = Result<Pong, Box<dyn Error + Send + Sync>>>
+ Send,
>,
> {
Box::pin(async move { Ok(Pong(req.0)) })
}
}
mod ninepecho {
use super::*;
#[derive(Debug, Clone, Copy)]
pub struct EchoService;
impl JetStreamService<Tframe, Rframe> for EchoService {
fn call(
&mut self,
req: Tframe,
) -> Pin<
Box<
dyn Future<
Output = Result<Rframe, Box<dyn Error + Send + Sync>>,
> + Send,
>,
> {
Box::pin(async move {
Ok(Rframe {
tag: 0,
msg: Rmessage::Version(p9::Rversion {
msize: 0,
version: "9P2000".to_string(),
}),
})
})
}
}
}
struct Echo;
impl Service<bytes::Bytes> for Echo {
type Error = Box<dyn Error + Send + Sync>;
type Future = Pin<
Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>,
>;
type Response = bytes::Bytes;
fn poll_ready(
&mut self,
_cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: bytes::Bytes) -> Self::Future {
Box::pin(async move { Ok(req) })
}
}
/// A trait for converting types to and from a wire format.
pub trait ConvertWireFormat: WireFormat {
/// Converts the type to a byte representation.
///
/// # Returns
///
/// A `Bytes` object representing the byte representation of the type.
fn to_bytes(&self) -> Bytes;
/// Converts a byte buffer to the type.
///
/// # Arguments
///
/// * `buf` - A mutable reference to a `Bytes` object containing the byte buffer.
///
/// # Returns
///
/// A `Result` containing the converted type or an `std::io::Error` if the conversion fails.
fn from_bytes(buf: &mut Bytes) -> Result<Self, std::io::Error>;
}
impl<T: p9::WireFormat> ConvertWireFormat for T {
fn to_bytes(&self) -> Bytes {
let mut buf = vec![];
let res = self.encode(&mut buf);
if let Err(e) = res {
panic!("Failed to encode: {}", e);
}
let mut bytes = BytesMut::new();
bytes.put_slice(buf.as_slice());
bytes.freeze()
}
fn from_bytes(buf: &mut Bytes) -> Result<Self, std::io::Error> {
let buf = buf.to_vec();
T::decode(&mut buf.as_slice())
}
}

View file

@ -1,167 +0,0 @@
pub(crate) enum Version {
V9P2000 = 0,
V9P2000U = 1,
V9P2000L = 2,
V9P2000Lu = 3,
V9P2024q9p = 4,
}
impl From<&str> for Version {
fn from(version: &str) -> Self {
match version {
"9P2000" => Version::V9P2000,
"9P2000.u" => Version::V9P2000U,
"9P2000.L" => Version::V9P2000L,
"9P2000.Lu" => Version::V9P2000Lu,
"9P2024.q9p" => Version::V9P2024q9p,
_ => panic!("Invalid 9P version: {}", version),
}
}
}
impl From<String> for Version {
fn from(version: String) -> Self {
match version.as_str() {
"9P2000" => Version::V9P2000,
"9P2000.u" => Version::V9P2000U,
"9P2000.L" => Version::V9P2000L,
"9P2000.Lu" => Version::V9P2000Lu,
"9P2024.q9p" => Version::V9P2024q9p,
_ => panic!("Invalid 9P version: {}", version),
}
}
}
impl Into<String> for Version {
fn into(self) -> String {
match self {
Version::V9P2000 => "9P2000".to_string(),
Version::V9P2000U => "9P2000.u".to_string(),
Version::V9P2000L => "9P2000.L".to_string(),
Version::V9P2000Lu => "9P2000.Lu".to_string(),
Version::V9P2024q9p => "9P2024.q9p".to_string(),
}
}
}
impl Into<&str> for Version {
fn into(self) -> &'static str {
match self {
Version::V9P2000 => "9P2000",
Version::V9P2000U => "9P2000.u",
Version::V9P2000L => "9P2000.L",
Version::V9P2000Lu => "9P2000.Lu",
Version::V9P2024q9p => "9P2024.q9p",
}
}
}
impl Version {
pub(crate) fn from_str(version: &str) -> Option<Self> {
match version {
"9P2000" => Some(Version::V9P2000),
"9P2000.u" => Some(Version::V9P2000U),
"9P2000.L" => Some(Version::V9P2000L),
"9P2000.Lu" => Some(Version::V9P2000Lu),
"9P2024.q9p" => Some(Version::V9P2024q9p),
_ => None,
}
}
}
pub(crate) const DEFAULT_MSIZE: u32 = 8192;
// Tlopen and Tlcreate flags. Taken from "include/net/9p/9p.h" in the linux tree.
pub(crate) const P9_RDONLY: u32 = 0o00000000;
pub(crate) const P9_WRONLY: u32 = 0o00000001;
pub(crate) const P9_RDWR: u32 = 0o00000002;
pub(crate) const P9_NOACCESS: u32 = 0o00000003;
pub(crate) const P9_CREATE: u32 = 0o00000100;
pub(crate) const P9_EXCL: u32 = 0o00000200;
pub(crate) const P9_NOCTTY: u32 = 0o00000400;
pub(crate) const P9_TRUNC: u32 = 0o00001000;
pub(crate) const P9_APPEND: u32 = 0o00002000;
pub(crate) const P9_NONBLOCK: u32 = 0o00004000;
pub(crate) const P9_DSYNC: u32 = 0o00010000;
pub(crate) const P9_FASYNC: u32 = 0o00020000;
pub(crate) const P9_DIRECT: u32 = 0o00040000;
pub(crate) const P9_LARGEFILE: u32 = 0o00100000;
pub(crate) const P9_DIRECTORY: u32 = 0o00200000;
pub(crate) const P9_NOFOLLOW: u32 = 0o00400000;
pub(crate) const P9_NOATIME: u32 = 0o01000000;
pub(crate) const _P9_CLOEXEC: u32 = 0o02000000;
pub(crate) const P9_SYNC: u32 = 0o04000000;
// Mapping from 9P flags to libc flags.
pub(crate) const MAPPED_FLAGS: [(u32, i32); 16] = [
(P9_WRONLY, libc::O_WRONLY),
(P9_RDWR, libc::O_RDWR),
(P9_CREATE, libc::O_CREAT),
(P9_EXCL, libc::O_EXCL),
(P9_NOCTTY, libc::O_NOCTTY),
(P9_TRUNC, libc::O_TRUNC),
(P9_APPEND, libc::O_APPEND),
(P9_NONBLOCK, libc::O_NONBLOCK),
(P9_DSYNC, libc::O_DSYNC),
(P9_FASYNC, 0), // Unsupported
(P9_DIRECT, libc::O_DIRECT),
(P9_LARGEFILE, libc::O_LARGEFILE),
(P9_DIRECTORY, libc::O_DIRECTORY),
(P9_NOFOLLOW, libc::O_NOFOLLOW),
(P9_NOATIME, libc::O_NOATIME),
(P9_SYNC, libc::O_SYNC),
];
// 9P Qid types. Taken from "include/net/9p/9p.h" in the linux tree.
pub(crate) const P9_QTDIR: u8 = 0x80;
pub(crate) const _P9_QTAPPEND: u8 = 0x40;
pub(crate) const _P9_QTEXCL: u8 = 0x20;
pub(crate) const _P9_QTMOUNT: u8 = 0x10;
pub(crate) const _P9_QTAUTH: u8 = 0x08;
pub(crate) const _P9_QTTMP: u8 = 0x04;
pub(crate) const P9_QTSYMLINK: u8 = 0x02;
pub(crate) const _P9_QTLINK: u8 = 0x01;
pub(crate) const P9_QTFILE: u8 = 0x00;
// Bitmask values for the getattr request.
pub(crate) const _P9_GETATTR_MODE: u64 = 0x00000001;
pub(crate) const _P9_GETATTR_NLINK: u64 = 0x00000002;
pub(crate) const _P9_GETATTR_UID: u64 = 0x00000004;
pub(crate) const _P9_GETATTR_GID: u64 = 0x00000008;
pub(crate) const _P9_GETATTR_RDEV: u64 = 0x00000010;
pub(crate) const _P9_GETATTR_ATIME: u64 = 0x00000020;
pub(crate) const _P9_GETATTR_MTIME: u64 = 0x00000040;
pub(crate) const _P9_GETATTR_CTIME: u64 = 0x00000080;
pub(crate) const _P9_GETATTR_INO: u64 = 0x00000100;
pub(crate) const _P9_GETATTR_SIZE: u64 = 0x00000200;
pub(crate) const _P9_GETATTR_BLOCKS: u64 = 0x00000400;
pub(crate) const _P9_GETATTR_BTIME: u64 = 0x00000800;
pub(crate) const _P9_GETATTR_GEN: u64 = 0x00001000;
pub(crate) const _P9_GETATTR_DATA_VERSION: u64 = 0x00002000;
pub(crate) const P9_GETATTR_BASIC: u64 = 0x000007ff; /* Mask for fields up to BLOCKS */
pub(crate) const _P9_GETATTR_ALL: u64 = 0x00003fff; /* Mask for All fields above */
// Bitmask values for the setattr request.
pub(crate) const P9_SETATTR_MODE: u32 = 0x00000001;
pub(crate) const P9_SETATTR_UID: u32 = 0x00000002;
pub(crate) const P9_SETATTR_GID: u32 = 0x00000004;
pub(crate) const P9_SETATTR_SIZE: u32 = 0x00000008;
pub(crate) const P9_SETATTR_ATIME: u32 = 0x00000010;
pub(crate) const P9_SETATTR_MTIME: u32 = 0x00000020;
pub(crate) const P9_SETATTR_CTIME: u32 = 0x00000040;
pub(crate) const P9_SETATTR_ATIME_SET: u32 = 0x00000080;
pub(crate) const P9_SETATTR_MTIME_SET: u32 = 0x00000100;
// 9p lock constants. Taken from "include/net/9p/9p.h" in the linux kernel.
pub(crate) const _P9_LOCK_TYPE_RDLCK: u8 = 0;
pub(crate) const _P9_LOCK_TYPE_WRLCK: u8 = 1;
pub(crate) const P9_LOCK_TYPE_UNLCK: u8 = 2;
pub(crate) const _P9_LOCK_FLAGS_BLOCK: u8 = 1;
pub(crate) const _P9_LOCK_FLAGS_RECLAIM: u8 = 2;
pub(crate) const P9_LOCK_SUCCESS: u8 = 0;
pub(crate) const _P9_LOCK_BLOCKED: u8 = 1;
pub(crate) const _P9_LOCK_ERROR: u8 = 2;
pub(crate) const _P9_LOCK_GRACE: u8 = 3;
// Minimum and maximum message size that we'll expect from the client.
pub(crate) const MIN_MESSAGE_SIZE: u32 = 256;
pub(crate) const MAX_MESSAGE_SIZE: u32 = 64 * 1024 + 24; // 64 KiB of payload plus some extra for the header