161 lines
4.9 KiB
Markdown
161 lines
4.9 KiB
Markdown
<img src="logo/JetStream.png" style="width: 200px">
|
|
|
|
# JetStream
|
|
|
|
[![crates.io](https://img.shields.io/crates/v/jetstream.svg)](https://crates.io/crates/jetstream) [![docs.rs](https://docs.rs/jetstream/badge.svg)](https://docs.rs/jetstream) <!--gh actions--> ![Build Status](https://github.com/sevki/jetstream/actions/workflows/rust.yml/badge.svg) ![Build Status](https://github.com/sevki/jetstream/actions/workflows/release.yml/badge.svg) [![crates.io downloads](https://img.shields.io/crates/d/jetstream.svg)](https://crates.io/crates/jetstream)
|
|
|
|
JetStream is an RPC framework built on top of [s2n-quic](https://crates.io/crates/s2n-quic) and [p9](https://crates.io/crates/p9). It's designed to be a high performance, low latency, secure, and reliable RPC framework.
|
|
|
|
Features:
|
|
|
|
- Bidirectional streaming
|
|
- 0-RTT
|
|
- [mTLS](https://github.com/aws/s2n-quic/tree/main/examples/s2n-mtls)
|
|
- binary encoding
|
|
|
|
## Motivation
|
|
|
|
Building remote filesystems over internet, is the main motivation behind JetStream.
|
|
|
|
## Ready?
|
|
|
|
JetStream is not ready for production use. It's still in the early stages of development.
|
|
|
|
## Alternatives
|
|
|
|
- [grpc](https://grpc.io/)
|
|
- [capnproto](https://capnproto.org/)
|
|
- [thrift](https://thrift.apache.org/)
|
|
- [jsonrpc](https://www.jsonrpc.org/)
|
|
- [tarpc](https://crates.io/crates/tarpc)
|
|
|
|
## Example [test](src/server/server_tests.rs)
|
|
|
|
```rust
|
|
let _guard = slog_scope::set_global_logger(setup_logging());
|
|
let _guard = slog_envlogger::new(drain());
|
|
let (_tx, _rx) = tokio::io::duplex(1024);
|
|
pub static CA_CERT_PEM: &str =
|
|
concat!(env!("CARGO_MANIFEST_DIR"), "/certs/ca-cert.pem");
|
|
pub static SERVER_CERT_PEM: &str =
|
|
concat!(env!("CARGO_MANIFEST_DIR"), "/certs/server-cert.pem");
|
|
pub static SERVER_KEY_PEM: &str =
|
|
concat!(env!("CARGO_MANIFEST_DIR"), "/certs/server-key.pem");
|
|
pub static CLIENT_CERT_PEM: &str =
|
|
concat!(env!("CARGO_MANIFEST_DIR"), "/certs/client-cert.pem");
|
|
pub static CLIENT_KEY_PEM: &str =
|
|
concat!(env!("CARGO_MANIFEST_DIR"), "/certs/client-key.pem");
|
|
|
|
let tls = tls::default::Server::builder()
|
|
.with_trusted_certificate(Path::new(CA_CERT_PEM))
|
|
.unwrap()
|
|
.with_certificate(
|
|
Path::new(SERVER_CERT_PEM),
|
|
Path::new(SERVER_KEY_PEM),
|
|
)
|
|
.unwrap()
|
|
.with_client_authentication()
|
|
.unwrap()
|
|
.build()
|
|
.unwrap();
|
|
|
|
let barrier = Arc::new(Barrier::new(3)).clone();
|
|
let c = barrier.clone();
|
|
let srv_handle = tokio::spawn(async move {
|
|
let server = Server::builder()
|
|
.with_tls(tls)
|
|
.unwrap()
|
|
.with_io("127.0.0.1:4433")
|
|
.unwrap()
|
|
.start()
|
|
.unwrap();
|
|
let qsrv: QuicServer<Tframe, Rframe, EchoService> =
|
|
QuicServer::new(ninepecho::EchoService);
|
|
debug!("Server started, waiting for barrier");
|
|
c.wait().await;
|
|
let _ = qsrv.serve(server).await;
|
|
});
|
|
|
|
let cert = path::PathBuf::from(CLIENT_CERT_PEM).into_boxed_path();
|
|
let key = path::PathBuf::from(CLIENT_KEY_PEM).into_boxed_path();
|
|
let ca_cert = path::PathBuf::from(CA_CERT_PEM).into_boxed_path();
|
|
let temp_dir = tmpdir::TmpDir::new("q9p").await.unwrap();
|
|
|
|
let mut listen = temp_dir.to_path_buf();
|
|
listen.push("q9p.sock");
|
|
let listen = listen.into_boxed_path();
|
|
let l = listen.clone();
|
|
let c = barrier.clone();
|
|
let prxy_handle = tokio::spawn(async move {
|
|
debug!("Proxy started, waiting for barrier");
|
|
c.wait().await;
|
|
|
|
let prxy = Proxy::new(
|
|
DialQuic::new(
|
|
"127.0.0.1".to_string(),
|
|
4433,
|
|
cert,
|
|
key,
|
|
ca_cert,
|
|
"localhost".to_string(),
|
|
),
|
|
l.clone(),
|
|
);
|
|
let _ = prxy.run().await;
|
|
});
|
|
let c = barrier.clone();
|
|
let l = listen.clone();
|
|
let client_handle = tokio::spawn(async move {
|
|
c.clone().wait().await;
|
|
// sleep for 5 milliseconds to give the server time to start
|
|
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
|
|
debug!("Connecting to {:?}", listen);
|
|
let (mut read, mut write) = tokio::net::UnixStream::connect(l)
|
|
.await
|
|
.unwrap()
|
|
.into_split();
|
|
|
|
// loop 100 times
|
|
for _ in 0..100 {
|
|
let test = Tframe {
|
|
tag: 0,
|
|
msg: Ok(Tmessage::Version(Tversion {
|
|
msize: 8192,
|
|
version: "9P2000.L".to_string(),
|
|
})),
|
|
};
|
|
debug!("Sending tframe: {:?}", test);
|
|
// ping
|
|
test.encode_async(&mut write).await.unwrap();
|
|
write.flush().await.unwrap();
|
|
debug!("Reading rframe");
|
|
read.readable().await.unwrap();
|
|
// pong
|
|
let resp = Rframe::decode_async(&mut read).await.unwrap();
|
|
assert_eq!(resp.tag, 0);
|
|
}
|
|
});
|
|
|
|
let timeout = std::time::Duration::from_secs(10);
|
|
|
|
let timeout = tokio::time::sleep(timeout);
|
|
|
|
tokio::select! {
|
|
_ = timeout => {
|
|
panic!("Timeout");
|
|
}
|
|
_ = srv_handle => {
|
|
panic!("Quic server failed");
|
|
}
|
|
_ = prxy_handle => {
|
|
panic!("Proxy failed");
|
|
}
|
|
_ = client_handle => {
|
|
return;
|
|
}
|
|
}
|
|
```
|
|
|
|
## [License](LICENSE)
|
|
|
|
BSD-3-Clause
|