From ecfd90d8026f081457c2ea4298792bd8e7deee85 Mon Sep 17 00:00:00 2001 From: Changyuan Lyu Date: Fri, 26 May 2023 16:03:08 -0700 Subject: [PATCH] p9cpud: implement the server Signed-off-by: Changyuan Lyu --- Cargo.lock | 15 ++ crates/libp9cpu/Cargo.toml | 1 + crates/libp9cpu/src/cmd.proto | 2 + crates/libp9cpu/src/cmd.rs | 6 + crates/libp9cpu/src/launcher.rs | 128 +++++++++++ crates/libp9cpu/src/lib.rs | 7 +- crates/libp9cpu/src/rpc/rpc_client.rs | 6 +- crates/libp9cpu/src/rpc/rpc_server.rs | 31 ++- crates/libp9cpu/src/server.rs | 294 +++++++++++++++++++++++--- crates/p9cpu/Cargo.toml | 1 + crates/p9cpu/src/p9cpu.rs | 14 +- crates/p9cpud/Cargo.toml | 2 + crates/p9cpud/src/p9cpud.rs | 46 +++- 13 files changed, 501 insertions(+), 52 deletions(-) create mode 100644 crates/libp9cpu/src/launcher.rs diff --git a/Cargo.lock b/Cargo.lock index 5733a97..02e752f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -479,6 +479,17 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440" +[[package]] +name = "justerror" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "befab2078d3ff679889e32730a1e9107b06a60a18ed7dfa3384f66ebe5e1062a" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -497,6 +508,7 @@ version = "0.1.0" dependencies = [ "async-trait", "futures", + "justerror", "libc", "log", "nix 0.26.2", @@ -636,6 +648,7 @@ dependencies = [ "anyhow", "clap", "libp9cpu", + "nix 0.26.2", "tokio", "tokio-vsock", ] @@ -648,6 +661,8 @@ dependencies = [ "clap", "libp9cpu", "tokio", + "tokio-vsock", + "vsock", ] [[package]] diff --git a/crates/libp9cpu/Cargo.toml b/crates/libp9cpu/Cargo.toml index da724c3..48e38a9 100644 --- a/crates/libp9cpu/Cargo.toml +++ b/crates/libp9cpu/Cargo.toml @@ -17,6 +17,7 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread","process","io-s tokio-vsock = { version = "0.4", features = ["tonic-conn"] } libc = "0.2.142" thiserror = "1" +justerror = "1" uuid = { version = "1", features = ["v4", "fast-rng", "macro-diagnostics"] } log = "0.4" tokio-stream = { version = "0.1.12", features = ["net"] } diff --git a/crates/libp9cpu/src/cmd.proto b/crates/libp9cpu/src/cmd.proto index b53f579..cdf851e 100644 --- a/crates/libp9cpu/src/cmd.proto +++ b/crates/libp9cpu/src/cmd.proto @@ -23,4 +23,6 @@ message Cmd { bool ninep = 5; bool tty = 6; string tmp_mnt = 7; + uint32 uid = 8; + uint32 gid = 9; } diff --git a/crates/libp9cpu/src/cmd.rs b/crates/libp9cpu/src/cmd.rs index 98f2881..f60f15e 100644 --- a/crates/libp9cpu/src/cmd.rs +++ b/crates/libp9cpu/src/cmd.rs @@ -93,4 +93,10 @@ impl Command { self.cmd.tmp_mnt = tmp_mnt; self } + + pub fn ugid(&mut self, uid: u32, gid: u32) -> &mut Self { + self.cmd.uid = uid; + self.cmd.gid = gid; + self + } } diff --git a/crates/libp9cpu/src/launcher.rs b/crates/libp9cpu/src/launcher.rs new file mode 100644 index 0000000..ec4deec --- /dev/null +++ b/crates/libp9cpu/src/launcher.rs @@ -0,0 +1,128 @@ +use justerror::Error; +use log::warn; +use nix::mount::{mount, MsFlags}; +use nix::sched::{unshare, CloneFlags}; +use nix::unistd::setsid; +use std::ffi::{CString, OsStr}; +use std::net::TcpStream; +use std::os::unix::prelude::OsStrExt; +use std::{io::Read, process::Command}; + +use crate::cmd; + +pub trait Launch { + fn launch(&self, port: u16) -> Command; +} + +fn parse_fstab_opt(opt: &str) -> (MsFlags, String) { + let mut opts = vec![]; + let mut flag = MsFlags::empty(); + for f in opt.split(',') { + if f == "defaults" { + continue; + } + if f == "bind" { + flag |= MsFlags::MS_BIND; + } else { + opts.push(f); + } + } + (flag, opts.join(",")) +} + +fn get_ptr(s: &str) -> Option<&str> { + if s.is_empty() { + None + } else { + Some(s) + } +} + +#[Error] +pub enum Error { + Unshare { + flag: &'static str, + #[source] + err: nix::Error, + }, + MountRoot(#[source] nix::Error), + UnexpectedBytes, + ShutdownSocket(#[source] std::io::Error), + ConvertCString(#[source] std::ffi::NulError), + Exec(#[source] nix::Error), + ConnectToPort(#[source] std::io::Error), + ReadUds(#[source] std::io::Error), + ProtoDecode(#[source] prost::DecodeError), + SetSid(#[source] nix::Error), + SetControllingTerminal(#[source] nix::Error), + SetUid(#[source] nix::Error), + SetGid(#[source] nix::Error), +} + +pub fn connect(port: u16) -> Result<(), Error> { + unshare(CloneFlags::CLONE_NEWNS).map_err(|err| Error::Unshare { + flag: "CLONE_NEWNS", + err, + })?; + let mut stream = TcpStream::connect(std::net::SocketAddrV4::new( + std::net::Ipv4Addr::LOCALHOST, + port, + )) + .map_err(Error::ConnectToPort)?; + let mut buf = [0; std::mem::size_of::()]; + stream.read_exact(&mut buf).map_err(Error::ReadUds)?; + let length = usize::from_le_bytes(buf); + let mut buf = vec![0; length]; + stream + .read_exact(&mut buf[0..length]) + .map_err(Error::ReadUds)?; + let cmd: cmd::Cmd = prost::Message::decode(buf.as_slice()).map_err(Error::ProtoDecode)?; + mount::(None, "/", None, MsFlags::MS_REC | MsFlags::MS_PRIVATE, None) + .map_err(Error::MountRoot)?; + + for tab in &cmd.fstab { + let (flags, data) = parse_fstab_opt(&tab.mntops); + if let Err(err) = nix::mount::mount( + get_ptr(&tab.spec), + tab.file.as_str(), + get_ptr(&tab.vfstype), + flags, + get_ptr(&data), + ) { + warn!("Mounting {:?}: {:?}", tab, err) + } + } + let mut buf = [0]; + match stream.read(&mut buf) { + Ok(0) => {} + _ => return Err(Error::UnexpectedBytes), + } + stream + .shutdown(std::net::Shutdown::Both) + .map_err(Error::ShutdownSocket)?; + + for env in &cmd.envs { + std::env::set_var(OsStr::from_bytes(&env.key), OsStr::from_bytes(&env.val)); + } + let arg0_c = CString::new(cmd.program).map_err(Error::ConvertCString)?; + let mut args_c = vec![]; + for arg in cmd.args { + args_c.push(CString::new(arg).map_err(Error::ConvertCString)?); + } + let mut full_args = vec![arg0_c.as_c_str()]; + for arg_c in &args_c { + full_args.push(&arg_c); + } + + if cmd.tty { + setsid().map_err(Error::SetSid)?; + nix::ioctl_none_bad!(tiocsctty, libc::TIOCSCTTY); + unsafe { tiocsctty(libc::STDIN_FILENO) }.map_err(Error::SetControllingTerminal)?; + } + + nix::unistd::setgid(cmd.gid.into()).map_err(Error::SetGid)?; + nix::unistd::setuid(cmd.uid.into()).map_err(Error::SetUid)?; + + nix::unistd::execvp(&arg0_c, &full_args).map_err(Error::Exec)?; + Ok(()) +} diff --git a/crates/libp9cpu/src/lib.rs b/crates/libp9cpu/src/lib.rs index bff2476..ba2b078 100644 --- a/crates/libp9cpu/src/lib.rs +++ b/crates/libp9cpu/src/lib.rs @@ -1,14 +1,15 @@ mod async_fd; pub mod client; pub mod cmd; +pub mod launcher; mod rpc; pub mod server; +use log::warn; +use std::collections::HashSet; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use std::collections::HashSet; -use log::warn; #[derive(Debug)] pub enum Addr { @@ -71,4 +72,4 @@ pub fn parse_namespace(namespace: &str, tmp_mnt: &str) -> Vec { }); } result -} \ No newline at end of file +} diff --git a/crates/libp9cpu/src/rpc/rpc_client.rs b/crates/libp9cpu/src/rpc/rpc_client.rs index 57e9b40..b40430c 100644 --- a/crates/libp9cpu/src/rpc/rpc_client.rs +++ b/crates/libp9cpu/src/rpc/rpc_client.rs @@ -102,11 +102,7 @@ impl crate::client::ClientInnerT for RpcClient { Ok(sid) } - async fn start( - &self, - sid: Self::SessionId, - command: cmd::Cmd, - ) -> Result<(), Self::Error> { + async fn start(&self, sid: Self::SessionId, command: cmd::Cmd) -> Result<(), Self::Error> { let req = rpc::StartRequest { id: sid.into_bytes().into(), cmd: Some(command), diff --git a/crates/libp9cpu/src/rpc/rpc_server.rs b/crates/libp9cpu/src/rpc/rpc_server.rs index 1e611c3..cca6401 100644 --- a/crates/libp9cpu/src/rpc/rpc_server.rs +++ b/crates/libp9cpu/src/rpc/rpc_server.rs @@ -7,12 +7,12 @@ use crate::Addr; use async_trait::async_trait; use futures::stream::{Stream, StreamExt}; use std::pin::Pin; +use std::task::Poll; use thiserror::Error; -use tonic::{Request, Response, Status, Streaming}; use tokio::net::UnixListener; use tokio_stream::wrappers::UnixListenerStream; use tokio_vsock::{VsockListener, VsockStream}; -use std::task::Poll; +use tonic::{Request, Response, Status, Streaming}; struct VsockListenerStream { listener: VsockListener, @@ -33,7 +33,6 @@ impl Stream for VsockListenerStream { } } - #[derive(Error, Debug)] pub enum Error { #[error("RPC error: {0}")] @@ -47,8 +46,11 @@ pub struct RpcServer {} #[async_trait] impl crate::server::P9cpuServerT for RpcServer { type Error = Error; - async fn serve(&self, addr: Addr) -> Result<(), Error> { - let p9cpu_service = rpc::p9cpu_server::P9cpuServer::new(P9cpuService::default()); + async fn serve(&self, addr: Addr, launcher: L) -> Result<(), Error> + where + L: crate::launcher::Launch + Send + Sync + 'static, + { + let p9cpu_service = rpc::p9cpu_server::P9cpuServer::new(P9cpuService::new(launcher)); let router = tonic::transport::Server::builder().add_service(p9cpu_service); match addr { Addr::Tcp(addr) => router.serve(addr).await?, @@ -79,13 +81,24 @@ fn vec_to_uuid(v: &Vec) -> Result { uuid::Uuid::from_slice(v).map_err(|e| Status::invalid_argument(e.to_string())) } -#[derive(Debug, Default)] -pub struct P9cpuService { - server: server::Server, +#[derive(Debug)] +pub struct P9cpuService { + server: server::Server, +} + +impl P9cpuService { + pub(crate) fn new(launcher: L) -> Self { + P9cpuService { + server: server::Server::new(launcher), + } + } } #[async_trait] -impl rpc::p9cpu_server::P9cpu for P9cpuService { +impl rpc::p9cpu_server::P9cpu for P9cpuService +where + L: crate::launcher::Launch + Send + Sync + 'static, +{ type StdoutStream = Pin> + Send>>; type StderrStream = Pin> + Send>>; type NinepForwardStream = Pin> + Send>>; diff --git a/crates/libp9cpu/src/server.rs b/crates/libp9cpu/src/server.rs index c2bd1e8..515ebba 100644 --- a/crates/libp9cpu/src/server.rs +++ b/crates/libp9cpu/src/server.rs @@ -1,63 +1,299 @@ +use crate::async_fd::AsyncFd; use crate::cmd; use async_trait::async_trait; use futures::Stream; +use prost::Message; +use std::fmt::{Debug, Display}; +use std::hash::Hash; +use std::os::unix::prelude::{FromRawFd, OwnedFd}; +use std::{collections::HashMap, sync::Arc}; use thiserror::Error; -use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; +use tokio::net::TcpListener; + +use tokio::{ + io::{AsyncRead, AsyncReadExt, AsyncWriteExt}, + process::{Child, Command}, + sync::{mpsc, RwLock}, + task::JoinHandle, +}; +use tokio_stream::{wrappers::ReceiverStream, StreamExt}; #[async_trait] pub trait P9cpuServerT { type Error: std::error::Error; - async fn serve(&self, addr: crate::Addr) -> Result<(), Self::Error>; + async fn serve(&self, addr: crate::Addr, launcher: L) -> Result<(), Self::Error> + where + L: crate::launcher::Launch + Send + Sync + 'static; +} + +// enum ChildStdio { +// Piped(OwnedFd, OwnedFd, OwnedFd), +// Pty(OwnedFd), +// } + +#[derive(Debug)] +pub struct Session { + stdin: Arc>, + stdout: Arc>, + stderr: Arc>>, + child: Arc>, + handles: Arc>>>>, +} + +#[derive(Debug)] +pub struct PendingSession { + ninep: Arc>)>>>, } #[derive(Error, Debug)] -pub enum Error {} - -#[derive(Debug)] -pub struct Server { - _phantom: Option, +pub enum Error { + #[error("Failed to spawn: {0}")] + SpawnFail(#[source] std::io::Error), + #[error("Session does not exist")] + SessionNotExist, + #[error("IO Error: {0}")] + IoErr(#[source] std::io::Error), + #[error("Duplicate session id")] + DuplicateId, + #[error("Command exited without return code.")] + NoReturnCode, + #[error("Command error: {0}")] + CommandError(#[source] std::io::Error), + #[error("Cannot open pty device")] + OpenPtyFail(#[from] nix::Error), + #[error("Cannot clone file descriptor: {0}")] + FdCloneFail(#[source] std::io::Error), + #[error("Cannot create directory: {0}")] + MkDir(#[source] std::io::Error), + #[error("Invalid FsTab: {0}")] + InvalidFsTab(String), + #[error("Cannot bind listener: {0}")] + BindFail(#[source] std::io::Error), + #[error("9p forward not setup")] + No9pPort, + #[error("String contains null: {0:?}")] + StringContainsNull(#[from] std::ffi::NulError), + #[error("Channel closed")] + ChannelClosed, } -impl Default for Server { - fn default() -> Self { - Self { _phantom: None } +impl From> for Error { + fn from(_: mpsc::error::SendError) -> Self { + Self::ChannelClosed } } -impl Server { - pub async fn start(&self, _command: cmd::Cmd, _sid: I) -> Result<(), Error> { - unimplemented!() +#[derive(Debug)] +pub struct Server { + launcher: L, + sessions: Arc>>, + pending: Arc>>, +} + +impl Server { + pub fn new(launcher: L) -> Self { + Self { + sessions: Arc::new(RwLock::new(HashMap::new())), + pending: Arc::new(RwLock::new(HashMap::new())), + launcher, + } + } +} + +impl Server +where + L: crate::launcher::Launch, + I: Eq + Hash + Debug + Display + Sync + Send + Clone + 'static, +{ + async fn get_session(&self, sid: &I, op: O) -> Result + where + O: Fn(&Session) -> R, + { + let sessions = self.sessions.read().await; + let info = sessions.get(sid).ok_or(Error::SessionNotExist)?; + Ok(op(info)) + } + + async fn copy_to( + src: &mut (impl AsyncRead + Unpin), + tx: mpsc::Sender>, + ) -> Result<(), Error> { + loop { + let mut buf = vec![0; 128]; + let len = src.read(&mut buf).await.map_err(Error::IoErr)?; + if len == 0 { + break; + } + buf.truncate(len); + tx.send(buf).await?; + } + Ok(()) + } + + pub async fn start(&self, cmd: cmd::Cmd, sid: I) -> Result<(), Error> { + let Some(PendingSession { ninep }) = self.pending.write().await.remove(&sid) else { + return Err(Error::SessionNotExist); + }; + let mut sessions = self.sessions.write().await; + if sessions.contains_key(&sid) { + return Err(Error::DuplicateId); + } + let mut handles = vec![]; + let mut ninep_port = None; + if let Some((port, handle)) = ninep.write().await.take() { + ninep_port = Some(port); + handles.push(handle); + } + + let listener = TcpListener::bind(std::net::SocketAddrV4::new( + std::net::Ipv4Addr::LOCALHOST, + 0, + )) + .await + .map_err(Error::BindFail)?; + let port = listener.local_addr().map_err(Error::BindFail)?.port(); + + let command = self.launcher.launch(port); + let mut command = Command::from(command); + if cmd.ninep { + let Some(_ninep_port) = ninep_port else { + return Err(Error::No9pPort); + }; + } + + let (stdin, stdout, stderr) = if cmd.tty { + let result = nix::pty::openpty(None, None).map_err(Error::OpenPtyFail)?; + let stdin = unsafe { OwnedFd::from_raw_fd(result.slave) }; + let stdout = stdin.try_clone().map_err(Error::FdCloneFail)?; + let stderr = stdin.try_clone().map_err(Error::FdCloneFail)?; + command.stdin(stdin).stdout(stdout).stderr(stderr); + let master = unsafe { OwnedFd::from_raw_fd(result.master) }; + let master_copy = master.try_clone().map_err(Error::FdCloneFail)?; + ( + AsyncFd::try_from(master).map_err(Error::IoErr)?, + AsyncFd::try_from(master_copy).map_err(Error::IoErr)?, + None, + ) + } else { + let (stdin_rd, stdin_wr) = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC)?; + let (stdout_rd, stdout_wr) = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC)?; + let (stderr_rd, stderr_wr) = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC)?; + let stdin = unsafe { OwnedFd::from_raw_fd(stdin_rd) }; + let stdout = unsafe { OwnedFd::from_raw_fd(stdout_wr) }; + let stderr = unsafe { OwnedFd::from_raw_fd(stderr_wr) }; + command.stdin(stdin).stdout(stdout).stderr(stderr); + ( + AsyncFd::try_from(unsafe { OwnedFd::from_raw_fd(stdin_wr) }) + .map_err(Error::IoErr)?, + AsyncFd::try_from(unsafe { OwnedFd::from_raw_fd(stdout_rd) }) + .map_err(Error::IoErr)?, + Some( + AsyncFd::try_from(unsafe { OwnedFd::from_raw_fd(stderr_rd) }) + .map_err(Error::IoErr)?, + ), + ) + }; + + let child = command.spawn().map_err(Error::SpawnFail)?; + + let (mut stream, _) = listener.accept().await.map_err(Error::IoErr)?; + let buf = cmd.encode_to_vec(); + let size_buf = buf.len().to_le_bytes(); + stream.write_all(&size_buf).await.map_err(Error::IoErr)?; + stream.write_all(&buf).await.map_err(Error::IoErr)?; + stream.shutdown().await.map_err(Error::IoErr)?; + drop(stream); + drop(listener); + + let info = Session { + stdin: Arc::new(RwLock::new(stdin)), + stdout: Arc::new(RwLock::new(stdout)), + stderr: Arc::new(RwLock::new(stderr)), + child: Arc::new(RwLock::new(child)), + handles: Arc::new(RwLock::new(handles)), + }; + log::info!("Session {} started", &sid); + sessions.insert(sid, info); + Ok(()) } pub async fn stdin( &self, - _sid: &I, - mut _in_stream: impl Stream> + Unpin, + sid: &I, + mut in_stream: impl Stream> + Unpin, ) -> Result<(), Error> { - unimplemented!() + let cmd_stdin = self.get_session(sid, |s| s.stdin.clone()).await?; + let mut cmd_stdin = cmd_stdin.write().await; + log::debug!("Session {} stdin stream started", sid); + while let Some(item) = in_stream.next().await { + cmd_stdin.write_all(&item).await.map_err(Error::IoErr)?; + } + Ok(()) } - pub async fn stdout(&self, _sid: &I) -> Result>, Error> { - // Not implemented yet - let (_tx, rx) = mpsc::channel(10); + pub async fn stdout(&self, sid: &I) -> Result>, Error> { + let cmd_stdout = self.get_session(sid, |s| s.stdout.clone()).await?; + let (tx, rx) = mpsc::channel(10); + let sid_copy = sid.clone(); + let out_handle = tokio::spawn(async move { + let mut out = cmd_stdout.write().await; + log::debug!("Session {} stdout stream started", &sid_copy); + Self::copy_to(&mut *out, tx).await + }); + let handles = self.get_session(sid, |s| s.handles.clone()).await?; + handles.write().await.push(out_handle); let stream = ReceiverStream::new(rx); Ok(stream) } - pub async fn stderr(&self, _sid: &I) -> Result>, Error> { - // Not implemented yet - let (_tx, rx) = mpsc::channel(10); + pub async fn stderr(&self, sid: &I) -> Result>, Error> { + let cmd_stderr = self.get_session(sid, |s| s.stderr.clone()).await?; + let (tx, rx) = mpsc::channel(10); + let sid_copy = sid.clone(); + let err_handle = tokio::spawn(async move { + let mut err = cmd_stderr.write().await; + let Some(ref mut err) = &mut *err else { + log::info!("Session {} has no stderr", &sid_copy); + return Ok(()); + }; + log::debug!("Session {} stderr stream started", &sid_copy); + Self::copy_to(&mut *err, tx).await + }); + let handles = self.get_session(sid, |s| s.handles.clone()).await?; + handles.write().await.push(err_handle); let stream = ReceiverStream::new(rx); Ok(stream) } - pub async fn dial(&self, _sid: I) -> Result<(), Error> { - unimplemented!() + pub async fn dial(&self, sid: I) -> Result<(), Error> { + let mut pending = self.pending.write().await; + if pending.contains_key(&sid) { + return Err(Error::DuplicateId); + } + let session = PendingSession { + ninep: Arc::new(RwLock::new(None)), + }; + log::info!("Session {} created", &sid); + pending.insert(sid, session); + Ok(()) } - pub async fn wait(&self, _sid: &I) -> Result { - unimplemented!() + pub async fn wait(&self, sid: &I) -> Result { + let child = self.get_session(sid, |s| s.child.clone()).await?; + let ret = match child.write().await.wait().await { + Ok(status) => status.code().ok_or(Error::NoReturnCode), + Err(e) => Err(Error::CommandError(e)), + }; + println!("child is done"); + let handles = self.get_session(sid, |s| s.handles.clone()).await?; + for handle in handles.write().await.iter_mut() { + if let Err(e) = handle.await { + eprintln!("handle join error {:?}", e); + } + } + self.sessions.write().await.remove(sid); + log::info!("Session {} is done", &sid); + ret } pub async fn ninep_forward( @@ -71,3 +307,7 @@ impl Server { Ok(stream) } } + +pub fn rpc_based() -> crate::rpc::rpc_server::RpcServer { + crate::rpc::rpc_server::RpcServer {} +} diff --git a/crates/p9cpu/Cargo.toml b/crates/p9cpu/Cargo.toml index 1e6e140..2bfb3f2 100644 --- a/crates/p9cpu/Cargo.toml +++ b/crates/p9cpu/Cargo.toml @@ -13,3 +13,4 @@ clap = { version = "4", features = ["derive"] } anyhow = "1" tokio = { version = "1", features = ["macros", "rt-multi-thread","process","io-std"] } tokio-vsock = { version = "0.4", features = ["tonic-conn"] } +nix = "0.26.2" diff --git a/crates/p9cpu/src/p9cpu.rs b/crates/p9cpu/src/p9cpu.rs index 7c0c117..077537b 100644 --- a/crates/p9cpu/src/p9cpu.rs +++ b/crates/p9cpu/src/p9cpu.rs @@ -1,10 +1,9 @@ -use anyhow::{Result, bail}; +use anyhow::{bail, Result}; use clap::Parser; -use libp9cpu::parse_namespace; use libp9cpu::cmd::{Command, FsTab}; -use tokio::io::AsyncBufReadExt; +use libp9cpu::parse_namespace; use std::os::unix::prelude::OsStringExt; - +use tokio::io::AsyncBufReadExt; #[derive(clap::ValueEnum, Clone, Debug)] enum Net { @@ -34,6 +33,12 @@ struct Args { #[arg(long, default_value = "/tmp")] tmp_mnt: String, + #[arg(long, default_value_t = nix::unistd::geteuid().as_raw())] + uid: u32, + + #[arg(long, default_value_t = nix::unistd::getegid().as_raw())] + gid: u32, + #[arg()] host: String, @@ -77,6 +82,7 @@ async fn app(args: Args) -> Result<()> { cmd.fstab(fs_tab_lines); cmd.tty(args.tty); cmd.tmp_mnt(args.tmp_mnt); + cmd.ugid(args.uid, args.gid); let mut client = libp9cpu::client::rpc_based(addr).await?; client.start(cmd).await?; diff --git a/crates/p9cpud/Cargo.toml b/crates/p9cpud/Cargo.toml index b7f6096..a7359bd 100644 --- a/crates/p9cpud/Cargo.toml +++ b/crates/p9cpud/Cargo.toml @@ -13,3 +13,5 @@ anyhow = "1" libp9cpu = { version = "*", path = "../libp9cpu" } clap = { version = "4", features = ["derive"] } tokio = { version = "1", features = ["macros", "rt-multi-thread","process","io-std"] } +tokio-vsock = { version = "0.4", features = ["tonic-conn"] } +vsock = "0.3.0" diff --git a/crates/p9cpud/src/p9cpud.rs b/crates/p9cpud/src/p9cpud.rs index 8aa6b3b..b8e3194 100644 --- a/crates/p9cpud/src/p9cpud.rs +++ b/crates/p9cpud/src/p9cpud.rs @@ -1,3 +1,5 @@ +use libp9cpu::server::P9cpuServerT; + use anyhow::Result; use clap::Parser; @@ -18,10 +20,46 @@ struct Args { port: u32, #[arg(long)] uds: Option, + #[arg(long, hide = true)] + launch: Option, } -#[tokio::main] -async fn main() -> Result<()> { - let args = Args::parse(); - unimplemented!("not implemented: {:?}", args); +struct Launcher {} +impl libp9cpu::launcher::Launch for Launcher { + fn launch(&self, port: u16) -> std::process::Command { + let arg0 = std::env::args().next().unwrap(); + let mut sub_cmd = std::process::Command::new(arg0); + sub_cmd.arg("--launch").arg(port.to_string()); + sub_cmd + } +} + +async fn app(args: Args) -> Result<()> { + let addr = match args.net { + Net::Vsock => libp9cpu::Addr::Vsock(tokio_vsock::VsockAddr::new( + vsock::VMADDR_CID_ANY, + args.port, + )), + Net::Tcp => libp9cpu::Addr::Tcp(format!("[::]:{}", args.port).parse().unwrap()), + Net::Unix => libp9cpu::Addr::Uds(args.uds.unwrap()), + }; + let launcher = Launcher {}; + let server = libp9cpu::server::rpc_based(); + server.serve(addr, launcher).await?; + Ok(()) +} + +fn main() -> Result<()> { + let args = Args::parse(); + if let Some(port) = args.launch { + libp9cpu::launcher::connect(port)?; + return Ok(()); + } + + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + let ret = runtime.block_on(app(args)); + ret }