mirror of
https://github.com/Lencerf/p9cpu.git
synced 2024-10-23 05:06:23 +00:00
p9cpud: implement the server
Signed-off-by: Changyuan Lyu <changyuanl@google.com>
This commit is contained in:
parent
517e90be56
commit
ecfd90d802
13 changed files with 501 additions and 52 deletions
15
Cargo.lock
generated
15
Cargo.lock
generated
|
@ -479,6 +479,17 @@ version = "1.0.5"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440"
|
||||
|
||||
[[package]]
|
||||
name = "justerror"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "befab2078d3ff679889e32730a1e9107b06a60a18ed7dfa3384f66ebe5e1062a"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lazy_static"
|
||||
version = "1.4.0"
|
||||
|
@ -497,6 +508,7 @@ version = "0.1.0"
|
|||
dependencies = [
|
||||
"async-trait",
|
||||
"futures",
|
||||
"justerror",
|
||||
"libc",
|
||||
"log",
|
||||
"nix 0.26.2",
|
||||
|
@ -636,6 +648,7 @@ dependencies = [
|
|||
"anyhow",
|
||||
"clap",
|
||||
"libp9cpu",
|
||||
"nix 0.26.2",
|
||||
"tokio",
|
||||
"tokio-vsock",
|
||||
]
|
||||
|
@ -648,6 +661,8 @@ dependencies = [
|
|||
"clap",
|
||||
"libp9cpu",
|
||||
"tokio",
|
||||
"tokio-vsock",
|
||||
"vsock",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
@ -17,6 +17,7 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread","process","io-s
|
|||
tokio-vsock = { version = "0.4", features = ["tonic-conn"] }
|
||||
libc = "0.2.142"
|
||||
thiserror = "1"
|
||||
justerror = "1"
|
||||
uuid = { version = "1", features = ["v4", "fast-rng", "macro-diagnostics"] }
|
||||
log = "0.4"
|
||||
tokio-stream = { version = "0.1.12", features = ["net"] }
|
||||
|
|
|
@ -23,4 +23,6 @@ message Cmd {
|
|||
bool ninep = 5;
|
||||
bool tty = 6;
|
||||
string tmp_mnt = 7;
|
||||
uint32 uid = 8;
|
||||
uint32 gid = 9;
|
||||
}
|
||||
|
|
|
@ -93,4 +93,10 @@ impl Command {
|
|||
self.cmd.tmp_mnt = tmp_mnt;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn ugid(&mut self, uid: u32, gid: u32) -> &mut Self {
|
||||
self.cmd.uid = uid;
|
||||
self.cmd.gid = gid;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
|
128
crates/libp9cpu/src/launcher.rs
Normal file
128
crates/libp9cpu/src/launcher.rs
Normal file
|
@ -0,0 +1,128 @@
|
|||
use justerror::Error;
|
||||
use log::warn;
|
||||
use nix::mount::{mount, MsFlags};
|
||||
use nix::sched::{unshare, CloneFlags};
|
||||
use nix::unistd::setsid;
|
||||
use std::ffi::{CString, OsStr};
|
||||
use std::net::TcpStream;
|
||||
use std::os::unix::prelude::OsStrExt;
|
||||
use std::{io::Read, process::Command};
|
||||
|
||||
use crate::cmd;
|
||||
|
||||
pub trait Launch {
|
||||
fn launch(&self, port: u16) -> Command;
|
||||
}
|
||||
|
||||
fn parse_fstab_opt(opt: &str) -> (MsFlags, String) {
|
||||
let mut opts = vec![];
|
||||
let mut flag = MsFlags::empty();
|
||||
for f in opt.split(',') {
|
||||
if f == "defaults" {
|
||||
continue;
|
||||
}
|
||||
if f == "bind" {
|
||||
flag |= MsFlags::MS_BIND;
|
||||
} else {
|
||||
opts.push(f);
|
||||
}
|
||||
}
|
||||
(flag, opts.join(","))
|
||||
}
|
||||
|
||||
fn get_ptr(s: &str) -> Option<&str> {
|
||||
if s.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(s)
|
||||
}
|
||||
}
|
||||
|
||||
#[Error]
|
||||
pub enum Error {
|
||||
Unshare {
|
||||
flag: &'static str,
|
||||
#[source]
|
||||
err: nix::Error,
|
||||
},
|
||||
MountRoot(#[source] nix::Error),
|
||||
UnexpectedBytes,
|
||||
ShutdownSocket(#[source] std::io::Error),
|
||||
ConvertCString(#[source] std::ffi::NulError),
|
||||
Exec(#[source] nix::Error),
|
||||
ConnectToPort(#[source] std::io::Error),
|
||||
ReadUds(#[source] std::io::Error),
|
||||
ProtoDecode(#[source] prost::DecodeError),
|
||||
SetSid(#[source] nix::Error),
|
||||
SetControllingTerminal(#[source] nix::Error),
|
||||
SetUid(#[source] nix::Error),
|
||||
SetGid(#[source] nix::Error),
|
||||
}
|
||||
|
||||
pub fn connect(port: u16) -> Result<(), Error> {
|
||||
unshare(CloneFlags::CLONE_NEWNS).map_err(|err| Error::Unshare {
|
||||
flag: "CLONE_NEWNS",
|
||||
err,
|
||||
})?;
|
||||
let mut stream = TcpStream::connect(std::net::SocketAddrV4::new(
|
||||
std::net::Ipv4Addr::LOCALHOST,
|
||||
port,
|
||||
))
|
||||
.map_err(Error::ConnectToPort)?;
|
||||
let mut buf = [0; std::mem::size_of::<usize>()];
|
||||
stream.read_exact(&mut buf).map_err(Error::ReadUds)?;
|
||||
let length = usize::from_le_bytes(buf);
|
||||
let mut buf = vec![0; length];
|
||||
stream
|
||||
.read_exact(&mut buf[0..length])
|
||||
.map_err(Error::ReadUds)?;
|
||||
let cmd: cmd::Cmd = prost::Message::decode(buf.as_slice()).map_err(Error::ProtoDecode)?;
|
||||
mount::<str, str, str, str>(None, "/", None, MsFlags::MS_REC | MsFlags::MS_PRIVATE, None)
|
||||
.map_err(Error::MountRoot)?;
|
||||
|
||||
for tab in &cmd.fstab {
|
||||
let (flags, data) = parse_fstab_opt(&tab.mntops);
|
||||
if let Err(err) = nix::mount::mount(
|
||||
get_ptr(&tab.spec),
|
||||
tab.file.as_str(),
|
||||
get_ptr(&tab.vfstype),
|
||||
flags,
|
||||
get_ptr(&data),
|
||||
) {
|
||||
warn!("Mounting {:?}: {:?}", tab, err)
|
||||
}
|
||||
}
|
||||
let mut buf = [0];
|
||||
match stream.read(&mut buf) {
|
||||
Ok(0) => {}
|
||||
_ => return Err(Error::UnexpectedBytes),
|
||||
}
|
||||
stream
|
||||
.shutdown(std::net::Shutdown::Both)
|
||||
.map_err(Error::ShutdownSocket)?;
|
||||
|
||||
for env in &cmd.envs {
|
||||
std::env::set_var(OsStr::from_bytes(&env.key), OsStr::from_bytes(&env.val));
|
||||
}
|
||||
let arg0_c = CString::new(cmd.program).map_err(Error::ConvertCString)?;
|
||||
let mut args_c = vec![];
|
||||
for arg in cmd.args {
|
||||
args_c.push(CString::new(arg).map_err(Error::ConvertCString)?);
|
||||
}
|
||||
let mut full_args = vec![arg0_c.as_c_str()];
|
||||
for arg_c in &args_c {
|
||||
full_args.push(&arg_c);
|
||||
}
|
||||
|
||||
if cmd.tty {
|
||||
setsid().map_err(Error::SetSid)?;
|
||||
nix::ioctl_none_bad!(tiocsctty, libc::TIOCSCTTY);
|
||||
unsafe { tiocsctty(libc::STDIN_FILENO) }.map_err(Error::SetControllingTerminal)?;
|
||||
}
|
||||
|
||||
nix::unistd::setgid(cmd.gid.into()).map_err(Error::SetGid)?;
|
||||
nix::unistd::setuid(cmd.uid.into()).map_err(Error::SetUid)?;
|
||||
|
||||
nix::unistd::execvp(&arg0_c, &full_args).map_err(Error::Exec)?;
|
||||
Ok(())
|
||||
}
|
|
@ -1,14 +1,15 @@
|
|||
mod async_fd;
|
||||
pub mod client;
|
||||
pub mod cmd;
|
||||
pub mod launcher;
|
||||
mod rpc;
|
||||
pub mod server;
|
||||
|
||||
use log::warn;
|
||||
use std::collections::HashSet;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::collections::HashSet;
|
||||
use log::warn;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Addr {
|
||||
|
@ -71,4 +72,4 @@ pub fn parse_namespace(namespace: &str, tmp_mnt: &str) -> Vec<cmd::FsTab> {
|
|||
});
|
||||
}
|
||||
result
|
||||
}
|
||||
}
|
||||
|
|
|
@ -102,11 +102,7 @@ impl crate::client::ClientInnerT for RpcClient {
|
|||
Ok(sid)
|
||||
}
|
||||
|
||||
async fn start(
|
||||
&self,
|
||||
sid: Self::SessionId,
|
||||
command: cmd::Cmd,
|
||||
) -> Result<(), Self::Error> {
|
||||
async fn start(&self, sid: Self::SessionId, command: cmd::Cmd) -> Result<(), Self::Error> {
|
||||
let req = rpc::StartRequest {
|
||||
id: sid.into_bytes().into(),
|
||||
cmd: Some(command),
|
||||
|
|
|
@ -7,12 +7,12 @@ use crate::Addr;
|
|||
use async_trait::async_trait;
|
||||
use futures::stream::{Stream, StreamExt};
|
||||
use std::pin::Pin;
|
||||
use std::task::Poll;
|
||||
use thiserror::Error;
|
||||
use tonic::{Request, Response, Status, Streaming};
|
||||
use tokio::net::UnixListener;
|
||||
use tokio_stream::wrappers::UnixListenerStream;
|
||||
use tokio_vsock::{VsockListener, VsockStream};
|
||||
use std::task::Poll;
|
||||
use tonic::{Request, Response, Status, Streaming};
|
||||
|
||||
struct VsockListenerStream {
|
||||
listener: VsockListener,
|
||||
|
@ -33,7 +33,6 @@ impl Stream for VsockListenerStream {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error("RPC error: {0}")]
|
||||
|
@ -47,8 +46,11 @@ pub struct RpcServer {}
|
|||
#[async_trait]
|
||||
impl crate::server::P9cpuServerT for RpcServer {
|
||||
type Error = Error;
|
||||
async fn serve(&self, addr: Addr) -> Result<(), Error> {
|
||||
let p9cpu_service = rpc::p9cpu_server::P9cpuServer::new(P9cpuService::default());
|
||||
async fn serve<L>(&self, addr: Addr, launcher: L) -> Result<(), Error>
|
||||
where
|
||||
L: crate::launcher::Launch + Send + Sync + 'static,
|
||||
{
|
||||
let p9cpu_service = rpc::p9cpu_server::P9cpuServer::new(P9cpuService::new(launcher));
|
||||
let router = tonic::transport::Server::builder().add_service(p9cpu_service);
|
||||
match addr {
|
||||
Addr::Tcp(addr) => router.serve(addr).await?,
|
||||
|
@ -79,13 +81,24 @@ fn vec_to_uuid(v: &Vec<u8>) -> Result<uuid::Uuid, Status> {
|
|||
uuid::Uuid::from_slice(v).map_err(|e| Status::invalid_argument(e.to_string()))
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct P9cpuService {
|
||||
server: server::Server<uuid::Uuid>,
|
||||
#[derive(Debug)]
|
||||
pub struct P9cpuService<L> {
|
||||
server: server::Server<uuid::Uuid, L>,
|
||||
}
|
||||
|
||||
impl<L> P9cpuService<L> {
|
||||
pub(crate) fn new(launcher: L) -> Self {
|
||||
P9cpuService {
|
||||
server: server::Server::new(launcher),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl rpc::p9cpu_server::P9cpu for P9cpuService {
|
||||
impl<L> rpc::p9cpu_server::P9cpu for P9cpuService<L>
|
||||
where
|
||||
L: crate::launcher::Launch + Send + Sync + 'static,
|
||||
{
|
||||
type StdoutStream = Pin<Box<dyn Stream<Item = Result<rpc::Bytes, Status>> + Send>>;
|
||||
type StderrStream = Pin<Box<dyn Stream<Item = Result<rpc::Bytes, Status>> + Send>>;
|
||||
type NinepForwardStream = Pin<Box<dyn Stream<Item = Result<rpc::Bytes, Status>> + Send>>;
|
||||
|
|
|
@ -1,63 +1,299 @@
|
|||
use crate::async_fd::AsyncFd;
|
||||
use crate::cmd;
|
||||
use async_trait::async_trait;
|
||||
use futures::Stream;
|
||||
use prost::Message;
|
||||
use std::fmt::{Debug, Display};
|
||||
use std::hash::Hash;
|
||||
use std::os::unix::prelude::{FromRawFd, OwnedFd};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use thiserror::Error;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
use tokio::{
|
||||
io::{AsyncRead, AsyncReadExt, AsyncWriteExt},
|
||||
process::{Child, Command},
|
||||
sync::{mpsc, RwLock},
|
||||
task::JoinHandle,
|
||||
};
|
||||
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
|
||||
|
||||
#[async_trait]
|
||||
pub trait P9cpuServerT {
|
||||
type Error: std::error::Error;
|
||||
async fn serve(&self, addr: crate::Addr) -> Result<(), Self::Error>;
|
||||
async fn serve<L>(&self, addr: crate::Addr, launcher: L) -> Result<(), Self::Error>
|
||||
where
|
||||
L: crate::launcher::Launch + Send + Sync + 'static;
|
||||
}
|
||||
|
||||
// enum ChildStdio {
|
||||
// Piped(OwnedFd, OwnedFd, OwnedFd),
|
||||
// Pty(OwnedFd),
|
||||
// }
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Session {
|
||||
stdin: Arc<RwLock<AsyncFd>>,
|
||||
stdout: Arc<RwLock<AsyncFd>>,
|
||||
stderr: Arc<RwLock<Option<AsyncFd>>>,
|
||||
child: Arc<RwLock<Child>>,
|
||||
handles: Arc<RwLock<Vec<JoinHandle<Result<(), Error>>>>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PendingSession {
|
||||
ninep: Arc<RwLock<Option<(u16, JoinHandle<Result<(), Error>>)>>>,
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum Error {}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Server<I> {
|
||||
_phantom: Option<I>,
|
||||
pub enum Error {
|
||||
#[error("Failed to spawn: {0}")]
|
||||
SpawnFail(#[source] std::io::Error),
|
||||
#[error("Session does not exist")]
|
||||
SessionNotExist,
|
||||
#[error("IO Error: {0}")]
|
||||
IoErr(#[source] std::io::Error),
|
||||
#[error("Duplicate session id")]
|
||||
DuplicateId,
|
||||
#[error("Command exited without return code.")]
|
||||
NoReturnCode,
|
||||
#[error("Command error: {0}")]
|
||||
CommandError(#[source] std::io::Error),
|
||||
#[error("Cannot open pty device")]
|
||||
OpenPtyFail(#[from] nix::Error),
|
||||
#[error("Cannot clone file descriptor: {0}")]
|
||||
FdCloneFail(#[source] std::io::Error),
|
||||
#[error("Cannot create directory: {0}")]
|
||||
MkDir(#[source] std::io::Error),
|
||||
#[error("Invalid FsTab: {0}")]
|
||||
InvalidFsTab(String),
|
||||
#[error("Cannot bind listener: {0}")]
|
||||
BindFail(#[source] std::io::Error),
|
||||
#[error("9p forward not setup")]
|
||||
No9pPort,
|
||||
#[error("String contains null: {0:?}")]
|
||||
StringContainsNull(#[from] std::ffi::NulError),
|
||||
#[error("Channel closed")]
|
||||
ChannelClosed,
|
||||
}
|
||||
|
||||
impl<I> Default for Server<I> {
|
||||
fn default() -> Self {
|
||||
Self { _phantom: None }
|
||||
impl<T> From<mpsc::error::SendError<T>> for Error {
|
||||
fn from(_: mpsc::error::SendError<T>) -> Self {
|
||||
Self::ChannelClosed
|
||||
}
|
||||
}
|
||||
|
||||
impl<I> Server<I> {
|
||||
pub async fn start(&self, _command: cmd::Cmd, _sid: I) -> Result<(), Error> {
|
||||
unimplemented!()
|
||||
#[derive(Debug)]
|
||||
pub struct Server<I, L> {
|
||||
launcher: L,
|
||||
sessions: Arc<RwLock<HashMap<I, Session>>>,
|
||||
pending: Arc<RwLock<HashMap<I, PendingSession>>>,
|
||||
}
|
||||
|
||||
impl<I, L> Server<I, L> {
|
||||
pub fn new(launcher: L) -> Self {
|
||||
Self {
|
||||
sessions: Arc::new(RwLock::new(HashMap::new())),
|
||||
pending: Arc::new(RwLock::new(HashMap::new())),
|
||||
launcher,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, L> Server<I, L>
|
||||
where
|
||||
L: crate::launcher::Launch,
|
||||
I: Eq + Hash + Debug + Display + Sync + Send + Clone + 'static,
|
||||
{
|
||||
async fn get_session<O, R>(&self, sid: &I, op: O) -> Result<R, Error>
|
||||
where
|
||||
O: Fn(&Session) -> R,
|
||||
{
|
||||
let sessions = self.sessions.read().await;
|
||||
let info = sessions.get(sid).ok_or(Error::SessionNotExist)?;
|
||||
Ok(op(info))
|
||||
}
|
||||
|
||||
async fn copy_to(
|
||||
src: &mut (impl AsyncRead + Unpin),
|
||||
tx: mpsc::Sender<Vec<u8>>,
|
||||
) -> Result<(), Error> {
|
||||
loop {
|
||||
let mut buf = vec![0; 128];
|
||||
let len = src.read(&mut buf).await.map_err(Error::IoErr)?;
|
||||
if len == 0 {
|
||||
break;
|
||||
}
|
||||
buf.truncate(len);
|
||||
tx.send(buf).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn start(&self, cmd: cmd::Cmd, sid: I) -> Result<(), Error> {
|
||||
let Some(PendingSession { ninep }) = self.pending.write().await.remove(&sid) else {
|
||||
return Err(Error::SessionNotExist);
|
||||
};
|
||||
let mut sessions = self.sessions.write().await;
|
||||
if sessions.contains_key(&sid) {
|
||||
return Err(Error::DuplicateId);
|
||||
}
|
||||
let mut handles = vec![];
|
||||
let mut ninep_port = None;
|
||||
if let Some((port, handle)) = ninep.write().await.take() {
|
||||
ninep_port = Some(port);
|
||||
handles.push(handle);
|
||||
}
|
||||
|
||||
let listener = TcpListener::bind(std::net::SocketAddrV4::new(
|
||||
std::net::Ipv4Addr::LOCALHOST,
|
||||
0,
|
||||
))
|
||||
.await
|
||||
.map_err(Error::BindFail)?;
|
||||
let port = listener.local_addr().map_err(Error::BindFail)?.port();
|
||||
|
||||
let command = self.launcher.launch(port);
|
||||
let mut command = Command::from(command);
|
||||
if cmd.ninep {
|
||||
let Some(_ninep_port) = ninep_port else {
|
||||
return Err(Error::No9pPort);
|
||||
};
|
||||
}
|
||||
|
||||
let (stdin, stdout, stderr) = if cmd.tty {
|
||||
let result = nix::pty::openpty(None, None).map_err(Error::OpenPtyFail)?;
|
||||
let stdin = unsafe { OwnedFd::from_raw_fd(result.slave) };
|
||||
let stdout = stdin.try_clone().map_err(Error::FdCloneFail)?;
|
||||
let stderr = stdin.try_clone().map_err(Error::FdCloneFail)?;
|
||||
command.stdin(stdin).stdout(stdout).stderr(stderr);
|
||||
let master = unsafe { OwnedFd::from_raw_fd(result.master) };
|
||||
let master_copy = master.try_clone().map_err(Error::FdCloneFail)?;
|
||||
(
|
||||
AsyncFd::try_from(master).map_err(Error::IoErr)?,
|
||||
AsyncFd::try_from(master_copy).map_err(Error::IoErr)?,
|
||||
None,
|
||||
)
|
||||
} else {
|
||||
let (stdin_rd, stdin_wr) = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC)?;
|
||||
let (stdout_rd, stdout_wr) = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC)?;
|
||||
let (stderr_rd, stderr_wr) = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC)?;
|
||||
let stdin = unsafe { OwnedFd::from_raw_fd(stdin_rd) };
|
||||
let stdout = unsafe { OwnedFd::from_raw_fd(stdout_wr) };
|
||||
let stderr = unsafe { OwnedFd::from_raw_fd(stderr_wr) };
|
||||
command.stdin(stdin).stdout(stdout).stderr(stderr);
|
||||
(
|
||||
AsyncFd::try_from(unsafe { OwnedFd::from_raw_fd(stdin_wr) })
|
||||
.map_err(Error::IoErr)?,
|
||||
AsyncFd::try_from(unsafe { OwnedFd::from_raw_fd(stdout_rd) })
|
||||
.map_err(Error::IoErr)?,
|
||||
Some(
|
||||
AsyncFd::try_from(unsafe { OwnedFd::from_raw_fd(stderr_rd) })
|
||||
.map_err(Error::IoErr)?,
|
||||
),
|
||||
)
|
||||
};
|
||||
|
||||
let child = command.spawn().map_err(Error::SpawnFail)?;
|
||||
|
||||
let (mut stream, _) = listener.accept().await.map_err(Error::IoErr)?;
|
||||
let buf = cmd.encode_to_vec();
|
||||
let size_buf = buf.len().to_le_bytes();
|
||||
stream.write_all(&size_buf).await.map_err(Error::IoErr)?;
|
||||
stream.write_all(&buf).await.map_err(Error::IoErr)?;
|
||||
stream.shutdown().await.map_err(Error::IoErr)?;
|
||||
drop(stream);
|
||||
drop(listener);
|
||||
|
||||
let info = Session {
|
||||
stdin: Arc::new(RwLock::new(stdin)),
|
||||
stdout: Arc::new(RwLock::new(stdout)),
|
||||
stderr: Arc::new(RwLock::new(stderr)),
|
||||
child: Arc::new(RwLock::new(child)),
|
||||
handles: Arc::new(RwLock::new(handles)),
|
||||
};
|
||||
log::info!("Session {} started", &sid);
|
||||
sessions.insert(sid, info);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn stdin(
|
||||
&self,
|
||||
_sid: &I,
|
||||
mut _in_stream: impl Stream<Item = Vec<u8>> + Unpin,
|
||||
sid: &I,
|
||||
mut in_stream: impl Stream<Item = Vec<u8>> + Unpin,
|
||||
) -> Result<(), Error> {
|
||||
unimplemented!()
|
||||
let cmd_stdin = self.get_session(sid, |s| s.stdin.clone()).await?;
|
||||
let mut cmd_stdin = cmd_stdin.write().await;
|
||||
log::debug!("Session {} stdin stream started", sid);
|
||||
while let Some(item) = in_stream.next().await {
|
||||
cmd_stdin.write_all(&item).await.map_err(Error::IoErr)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn stdout(&self, _sid: &I) -> Result<impl Stream<Item = Vec<u8>>, Error> {
|
||||
// Not implemented yet
|
||||
let (_tx, rx) = mpsc::channel(10);
|
||||
pub async fn stdout(&self, sid: &I) -> Result<impl Stream<Item = Vec<u8>>, Error> {
|
||||
let cmd_stdout = self.get_session(sid, |s| s.stdout.clone()).await?;
|
||||
let (tx, rx) = mpsc::channel(10);
|
||||
let sid_copy = sid.clone();
|
||||
let out_handle = tokio::spawn(async move {
|
||||
let mut out = cmd_stdout.write().await;
|
||||
log::debug!("Session {} stdout stream started", &sid_copy);
|
||||
Self::copy_to(&mut *out, tx).await
|
||||
});
|
||||
let handles = self.get_session(sid, |s| s.handles.clone()).await?;
|
||||
handles.write().await.push(out_handle);
|
||||
let stream = ReceiverStream::new(rx);
|
||||
Ok(stream)
|
||||
}
|
||||
|
||||
pub async fn stderr(&self, _sid: &I) -> Result<impl Stream<Item = Vec<u8>>, Error> {
|
||||
// Not implemented yet
|
||||
let (_tx, rx) = mpsc::channel(10);
|
||||
pub async fn stderr(&self, sid: &I) -> Result<impl Stream<Item = Vec<u8>>, Error> {
|
||||
let cmd_stderr = self.get_session(sid, |s| s.stderr.clone()).await?;
|
||||
let (tx, rx) = mpsc::channel(10);
|
||||
let sid_copy = sid.clone();
|
||||
let err_handle = tokio::spawn(async move {
|
||||
let mut err = cmd_stderr.write().await;
|
||||
let Some(ref mut err) = &mut *err else {
|
||||
log::info!("Session {} has no stderr", &sid_copy);
|
||||
return Ok(());
|
||||
};
|
||||
log::debug!("Session {} stderr stream started", &sid_copy);
|
||||
Self::copy_to(&mut *err, tx).await
|
||||
});
|
||||
let handles = self.get_session(sid, |s| s.handles.clone()).await?;
|
||||
handles.write().await.push(err_handle);
|
||||
let stream = ReceiverStream::new(rx);
|
||||
Ok(stream)
|
||||
}
|
||||
|
||||
pub async fn dial(&self, _sid: I) -> Result<(), Error> {
|
||||
unimplemented!()
|
||||
pub async fn dial(&self, sid: I) -> Result<(), Error> {
|
||||
let mut pending = self.pending.write().await;
|
||||
if pending.contains_key(&sid) {
|
||||
return Err(Error::DuplicateId);
|
||||
}
|
||||
let session = PendingSession {
|
||||
ninep: Arc::new(RwLock::new(None)),
|
||||
};
|
||||
log::info!("Session {} created", &sid);
|
||||
pending.insert(sid, session);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn wait(&self, _sid: &I) -> Result<i32, Error> {
|
||||
unimplemented!()
|
||||
pub async fn wait(&self, sid: &I) -> Result<i32, Error> {
|
||||
let child = self.get_session(sid, |s| s.child.clone()).await?;
|
||||
let ret = match child.write().await.wait().await {
|
||||
Ok(status) => status.code().ok_or(Error::NoReturnCode),
|
||||
Err(e) => Err(Error::CommandError(e)),
|
||||
};
|
||||
println!("child is done");
|
||||
let handles = self.get_session(sid, |s| s.handles.clone()).await?;
|
||||
for handle in handles.write().await.iter_mut() {
|
||||
if let Err(e) = handle.await {
|
||||
eprintln!("handle join error {:?}", e);
|
||||
}
|
||||
}
|
||||
self.sessions.write().await.remove(sid);
|
||||
log::info!("Session {} is done", &sid);
|
||||
ret
|
||||
}
|
||||
|
||||
pub async fn ninep_forward(
|
||||
|
@ -71,3 +307,7 @@ impl<I> Server<I> {
|
|||
Ok(stream)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn rpc_based() -> crate::rpc::rpc_server::RpcServer {
|
||||
crate::rpc::rpc_server::RpcServer {}
|
||||
}
|
||||
|
|
|
@ -13,3 +13,4 @@ clap = { version = "4", features = ["derive"] }
|
|||
anyhow = "1"
|
||||
tokio = { version = "1", features = ["macros", "rt-multi-thread","process","io-std"] }
|
||||
tokio-vsock = { version = "0.4", features = ["tonic-conn"] }
|
||||
nix = "0.26.2"
|
||||
|
|
|
@ -1,10 +1,9 @@
|
|||
use anyhow::{Result, bail};
|
||||
use anyhow::{bail, Result};
|
||||
use clap::Parser;
|
||||
use libp9cpu::parse_namespace;
|
||||
use libp9cpu::cmd::{Command, FsTab};
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use libp9cpu::parse_namespace;
|
||||
use std::os::unix::prelude::OsStringExt;
|
||||
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
|
||||
#[derive(clap::ValueEnum, Clone, Debug)]
|
||||
enum Net {
|
||||
|
@ -34,6 +33,12 @@ struct Args {
|
|||
#[arg(long, default_value = "/tmp")]
|
||||
tmp_mnt: String,
|
||||
|
||||
#[arg(long, default_value_t = nix::unistd::geteuid().as_raw())]
|
||||
uid: u32,
|
||||
|
||||
#[arg(long, default_value_t = nix::unistd::getegid().as_raw())]
|
||||
gid: u32,
|
||||
|
||||
#[arg()]
|
||||
host: String,
|
||||
|
||||
|
@ -77,6 +82,7 @@ async fn app(args: Args) -> Result<()> {
|
|||
cmd.fstab(fs_tab_lines);
|
||||
cmd.tty(args.tty);
|
||||
cmd.tmp_mnt(args.tmp_mnt);
|
||||
cmd.ugid(args.uid, args.gid);
|
||||
|
||||
let mut client = libp9cpu::client::rpc_based(addr).await?;
|
||||
client.start(cmd).await?;
|
||||
|
|
|
@ -13,3 +13,5 @@ anyhow = "1"
|
|||
libp9cpu = { version = "*", path = "../libp9cpu" }
|
||||
clap = { version = "4", features = ["derive"] }
|
||||
tokio = { version = "1", features = ["macros", "rt-multi-thread","process","io-std"] }
|
||||
tokio-vsock = { version = "0.4", features = ["tonic-conn"] }
|
||||
vsock = "0.3.0"
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
use libp9cpu::server::P9cpuServerT;
|
||||
|
||||
use anyhow::Result;
|
||||
|
||||
use clap::Parser;
|
||||
|
@ -18,10 +20,46 @@ struct Args {
|
|||
port: u32,
|
||||
#[arg(long)]
|
||||
uds: Option<String>,
|
||||
#[arg(long, hide = true)]
|
||||
launch: Option<u16>,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let args = Args::parse();
|
||||
unimplemented!("not implemented: {:?}", args);
|
||||
struct Launcher {}
|
||||
impl libp9cpu::launcher::Launch for Launcher {
|
||||
fn launch(&self, port: u16) -> std::process::Command {
|
||||
let arg0 = std::env::args().next().unwrap();
|
||||
let mut sub_cmd = std::process::Command::new(arg0);
|
||||
sub_cmd.arg("--launch").arg(port.to_string());
|
||||
sub_cmd
|
||||
}
|
||||
}
|
||||
|
||||
async fn app(args: Args) -> Result<()> {
|
||||
let addr = match args.net {
|
||||
Net::Vsock => libp9cpu::Addr::Vsock(tokio_vsock::VsockAddr::new(
|
||||
vsock::VMADDR_CID_ANY,
|
||||
args.port,
|
||||
)),
|
||||
Net::Tcp => libp9cpu::Addr::Tcp(format!("[::]:{}", args.port).parse().unwrap()),
|
||||
Net::Unix => libp9cpu::Addr::Uds(args.uds.unwrap()),
|
||||
};
|
||||
let launcher = Launcher {};
|
||||
let server = libp9cpu::server::rpc_based();
|
||||
server.serve(addr, launcher).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let args = Args::parse();
|
||||
if let Some(port) = args.launch {
|
||||
libp9cpu::launcher::connect(port)?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
let ret = runtime.block_on(app(args));
|
||||
ret
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue