From 0722897512e6029df8bef1fd3a331ae0f1c0ed12 Mon Sep 17 00:00:00 2001 From: Philip Metzger Date: Fri, 15 Dec 2023 19:14:18 +0100 Subject: [PATCH] run: Flesh out a bare implementation of `jj run`. This is basically a MVP based on `fix`, caching is not implemented yet. The core functionality is in `run_inner()` and `rewrite_commit()`. TODO: rewrite trees and expose all files --- Cargo.lock | 1 + cli/Cargo.toml | 1 + cli/src/commands/run.rs | 411 +++++++++++++++++++++++++++++++++++++++- 3 files changed, 403 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 21a1b9323..23da2fe9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1862,6 +1862,7 @@ dependencies = [ "textwrap", "thiserror", "timeago", + "tokio", "toml_edit", "tracing", "tracing-chrome", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index ef261d178..d1e029112 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -85,6 +85,7 @@ tempfile = { workspace = true } textwrap = { workspace = true } thiserror = { workspace = true } timeago = { workspace = true } +tokio = { workspace = true } toml_edit = { workspace = true } tracing = { workspace = true } tracing-chrome = { workspace = true } diff --git a/cli/src/commands/run.rs b/cli/src/commands/run.rs index 0cabb833f..bdf133391 100644 --- a/cli/src/commands/run.rs +++ b/cli/src/commands/run.rs @@ -14,15 +14,349 @@ //! This file contains the internal implementation of `run`. -use itertools::Itertools as _; +use std::collections::HashMap; +use std::collections::HashSet; +use std::fs; +use std::fs::File; +use std::fs::OpenOptions; +use std::io; +use std::path::Path; +use std::path::PathBuf; +use std::process::ExitStatus; +use std::sync::mpsc::Sender; +use std::sync::Arc; + +use itertools::Itertools; +use jj_lib::backend::BackendError; +use jj_lib::backend::CommitId; +use jj_lib::backend::MergedTreeId; +use jj_lib::commit::Commit; +use jj_lib::commit::CommitIteratorExt; +use jj_lib::fsmonitor::FsmonitorSettings; +use jj_lib::gitignore::GitIgnoreFile; +use jj_lib::local_working_copy::TreeState; +use jj_lib::local_working_copy::TreeStateError; +use jj_lib::matchers::EverythingMatcher; +use jj_lib::object_id::ObjectId; +use jj_lib::repo_path::RepoPath; +use jj_lib::tree::Tree; +use jj_lib::working_copy::SnapshotOptions; +use pollster::FutureExt; +use tokio::runtime::Builder; +use tokio::sync::Mutex; +use tokio::task::JoinSet; use crate::cli_util::CommandHelper; use crate::cli_util::RevisionArg; -use crate::command_error::user_error; +use crate::cli_util::WorkspaceCommandTransaction; use crate::command_error::CommandError; use crate::ui::Ui; -/// (**Stub**, does not work yet) Run a command across a set of revisions. +#[derive(Debug, thiserror::Error)] +enum RunError { + #[error("failed to checkout commit")] + FailedCheckout, + #[error("the command failed {} for {}", 0, 1)] + CommandFailure(ExitStatus, CommitId), + #[error(transparent)] + IoError(#[from] io::Error), + #[error("failed to load a commits tree")] + TreeState(#[from] TreeStateError), + #[error(transparent)] + Backend(#[from] BackendError), +} + +impl From for CommandError { + fn from(value: RunError) -> Self { + CommandError::new(crate::command_error::CommandErrorKind::Cli, Box::new(value)) + } +} + +/// Creates the required directories for a StoredWorkingCopy. +/// Returns a tuple of (`output_dir`, `working_copy` and `state`). +fn create_working_copy_paths(path: &Path) -> Result<(PathBuf, PathBuf, PathBuf), std::io::Error> { + tracing::debug!(?path, "creating working copy paths for path"); + let output = path.join("output"); + let working_copy = path.join("working_copy"); + let state = path.join("state"); + std::fs::create_dir(&output)?; + std::fs::create_dir(&working_copy)?; + std::fs::create_dir(&state)?; + Ok((output, working_copy, state)) +} + +/// Represent a `MergeTreeId` in a way that it may be used as a working-copy +/// name. This makes no stability guarantee, as the format may change at +/// any time. +fn to_wc_name(id: &MergedTreeId) -> String { + match id { + MergedTreeId::Legacy(tree_id) => tree_id.hex(), + MergedTreeId::Merge(tree_ids) => { + let mut obfuscated = tree_ids + .map(|id| id.hex()) + .iter_mut() + .enumerate() + .map(|(i, s)| { + // Incredibly "smart" way to say, append "-" if the number is odd "+" + // otherwise. + if i & 1 != 0 { + s.push('-'); + } else { + s.push('+'); + } + s.to_owned() + }) + .collect::(); + // `PATH_MAX` could be a problem for different operating systems, so truncate + // it. + if obfuscated.len() >= 255 { + obfuscated.truncate(200); + } + obfuscated + } + } +} + +fn get_runtime(jobs: usize) -> tokio::runtime::Runtime { + let mut builder = Builder::new_multi_thread(); + builder.max_blocking_threads(jobs); + builder.build().unwrap() +} + +/// A commit stored under `.jj/run/default/` +// TODO: Create a caching backend, which creates these on a dedicated thread or +// threadpool. +struct OnDiskCommit { + /// Obfuscated name for an easier lookup. Not set for `StoredCommits` which + /// have no longer have the original commits `TreeState` set. + name: Option, + /// The respective commit unmodified. + commit: Commit, + /// The output directory of the commit, contains stdout and stderr for it + output_dir: PathBuf, + /// Self-explanatory + working_copy_dir: PathBuf, + /// Where the state is stored + state_dir: PathBuf, + /// The commits `TreeState`, which is loaded on creation and then replaced + /// if necessary. Protected by a Mutex for crossthread compatibility. + tree_state: Mutex, +} + +impl OnDiskCommit { + fn new( + name: Option, + commit: &Commit, + output_dir: PathBuf, + working_copy_dir: PathBuf, + state_dir: PathBuf, + tree_state: Mutex, + ) -> Self { + Self { + name, + commit: commit.clone(), + output_dir, + working_copy_dir, + tree_state, + state_dir, + } + } +} + +fn create_output_files(id: &CommitId, path: &Path) -> Result<(File, File), RunError> { + // We use the hex id of the commit here to allow multiple `std{in,err}`s to be + // placed beside each other in a single output directory. + tracing::debug!(?id, "creating output files (stdout, stderr) for commit "); + let stdout_path = path.join("output").join(format!("stdout.{}", id.hex())); + let stderr_path = path.join("output").join(format!("stderr.{}", id.hex())); + let mut file_options = OpenOptions::new(); + let stdout = file_options.write(true).create(true).open(stdout_path)?; + let stderr = file_options.write(true).create(true).open(stderr_path)?; + Ok((stdout, stderr)) +} + +fn create_working_copies( + repo_path: &Path, + commits: &[Commit], +) -> Result>, RunError> { + let mut results = vec![]; + // TODO: should be stored in a backend and not hardcoded. + // The parent() call is needed to not write under `.jj/repo/`. + let base_path = repo_path.parent().unwrap().join("run").join("default"); + if !base_path.exists() { + fs::create_dir_all(&base_path)?; + } + tracing::debug!(path = ?base_path, "creating working copies in path: "); + for commit in commits { + let name = to_wc_name(commit.tree_id()); + let commit_path = base_path.join(name.as_str()); + fs::create_dir(&commit_path)?; + tracing::debug!( + dir = ?commit_path, + commit = commit.id().hex(), + "creating directory for the commit" + ); + + let (output_dir, working_copy_dir, state_dir) = create_working_copy_paths(&commit_path)?; + let tree_state = { + tracing::debug!( + commit = commit.id().hex(), + "trying to create a treestate for commit" + ); + let mut tree_state = TreeState::init( + commit.store().clone(), + working_copy_dir.clone(), + state_dir.clone(), + )?; + tree_state + .check_out(&commit.tree()?) + .map_err(|_| RunError::FailedCheckout)?; + Mutex::new(tree_state) + }; + let stored_commit = OnDiskCommit::new( + Some(name), + commit, + output_dir, + working_copy_dir, + state_dir, + tree_state, + ); + results.push(Arc::new(stored_commit)); + } + Ok(results) +} + +/// Get the shell to execute in. +// TODO: use something like `[run].shell` +fn get_shell_executable() -> String { + if cfg!(target_os = "windows") { + "cmd /c".into() + } else { + "/bin/sh -c".into() + } +} + +/// The result of a single command invocation. +struct RunJob { + /// The old `CommitId` of the commit. + old_id: CommitId, + /// The new tree generated from the commit. + new_tree: Tree, +} + +// TODO: make this more revset/commit stream friendly. +async fn run_inner<'a>( + tx: &WorkspaceCommandTransaction<'a>, + sender: Sender, + handle: &tokio::runtime::Handle, + shell_command: Arc, + commits: Arc>>, +) -> Result<(), RunError> { + let mut command_futures = JoinSet::new(); + for commit in commits.iter() { + command_futures.spawn_on( + rewrite_commit( + tx.base_workspace_helper().base_ignores().unwrap().clone(), + commit.clone(), + shell_command.clone(), + ), + handle, + ); + } + + while let Some(res) = command_futures.join_next().await { + let done = res.unwrap().expect("should not fail joining a job"); + let should_quit = sender.send(done).is_err(); + if should_quit { + tracing::debug!( + ?should_quit, + "receiver is no longer available, exiting loop" + ); + break; + } + } + Ok(()) +} + +/// Rewrite a single `OnDiskCommit`. The caller is responsible for creating the +/// final commit. +async fn rewrite_commit<'a>( + base_ignores: Arc, + stored_commit: Arc, + shell_command: Arc, +) -> Result { + let (stdout, stderr) = + create_output_files(stored_commit.commit.id(), &stored_commit.output_dir)?; + // TODO: Later this should take some trait which allows `run` to integrate with + // something like Bazels RE protocol. + // e.g + // ``` + // let mut executor /* Arc */ = store.get_executor(); + // let command = executor.spawn(...)?; // RE or separate processes depending on impl. + // ... + // ``` + tracing::debug!( + "trying to run {shell_command} on commit {id}", + id = stored_commit.commit.id().hex(), + shell_command = shell_command.as_str() + ); + let mut command = tokio::process::Command::new(get_shell_executable()) + .arg(shell_command.as_str()) + // set cwd to the working copy directory. + .current_dir(&stored_commit.working_copy_dir) + // .arg() + // TODO: relativize + // .env("JJ_PATH", stored_commit.working_copy_dir) + .env("JJ_CHANGE", stored_commit.commit.change_id().hex()) + .env("JJ_COMMIT_ID", stored_commit.commit.id().hex()) + .stdout(stdout) + .stderr(stderr) + .kill_on_drop(true) // No zombies allowed. + .spawn()?; + + let commit = stored_commit.commit.clone(); + let old_id = commit.id().clone(); + + let status = command.wait().await?; + + if !status.success() { + return Err(RunError::CommandFailure(status, old_id.clone())); + } + + let tree_state = &mut stored_commit.tree_state.lock().await; + + let options = SnapshotOptions { + base_ignores, + // TODO: read from current wc/settings + start_tracking_matcher: &EverythingMatcher, + fsmonitor_settings: FsmonitorSettings::None, + progress: None, + // TODO: read from current wc/settings + max_new_file_size: 64_000_u64, // 64 MB for now, + }; + tracing::debug!("trying to snapshot the new tree"); + let dirty = tree_state.snapshot(&options).unwrap(); + if !dirty { + tracing::debug!( + "commit {:?} was not modified as the passed command did nothing", + commit.id() + ); + } + + let rewritten_id = tree_state.current_tree_id().to_merge(); + let new_id = rewritten_id.as_resolved().unwrap(); + + let new_tree = commit + .store() + .get_tree_async(RepoPath::root(), new_id) + .await?; + + // TODO: Serialize the new tree into /output/{id-tree} + + Ok(RunJob { old_id, new_tree }) +} + +/// Run a command across a set of revisions. /// /// /// All recorded state will be persisted in the `.jj` directory, so occasionally @@ -41,30 +375,87 @@ pub struct RunArgs { shell_command: String, /// The revisions to change. #[arg(long, short, default_value = "@")] - revisions: Vec, + revisions: RevisionArg, /// A no-op option to match the interface of `git rebase -x`. #[arg(short = 'x', hide = true)] - unused_command: bool, + exec: bool, /// How many processes should run in parallel, uses by default all cores. #[arg(long, short)] jobs: Option, } pub fn cmd_run(ui: &mut Ui, command: &CommandHelper, args: &RunArgs) -> Result<(), CommandError> { - let workspace_command = command.workspace_helper(ui)?; - let _resolved_commits: Vec<_> = workspace_command - .parse_union_revsets(ui, &args.revisions)? + let mut workspace_command = command.workspace_helper(ui)?; + // The commits are already returned in reverse topological order. + let resolved_commits: Vec<_> = workspace_command + .parse_revset(ui, &args.revisions)? .evaluate_to_commits()? .try_collect()?; // Jobs are resolved in this order: // 1. Commandline argument iff > 0. // 2. the amount of cores available. // 3. a single job, if all of the above fails. - let _jobs = match args.jobs { + let jobs = match args.jobs { Some(0) | None => std::thread::available_parallelism().map(|t| t.into()).ok(), Some(jobs) => Some(jobs), } // Fallback to a single user-visible job. .unwrap_or(1usize); - Err(user_error("This is a stub, do not use")) + + let rt = get_runtime(jobs); + let mut done_commits = HashSet::new(); + let (sender_tx, receiver) = std::sync::mpsc::channel(); + + let mut tx = workspace_command.start_transaction(); + let repo_path = tx.base_workspace_helper().repo_path(); + + // TODO: consider on-demand creation for the inner loop. + let stored_commits = Arc::new(create_working_copies(repo_path, &resolved_commits)?); + let stored_len = stored_commits.len(); + + // Start all the jobs. + async { + run_inner( + &tx, + sender_tx, + rt.handle(), + Arc::new(args.shell_command.clone()), + stored_commits, + ) + .await + } + .block_on()?; + + let mut rewritten_commits = HashMap::new(); + loop { + if let Ok(res) = receiver.recv() { + done_commits.insert(res.old_id.clone()); + rewritten_commits.insert(res.old_id.clone(), res.new_tree); + } + if rewritten_commits.len() == stored_len { + break; + } + } + drop(receiver); + + let mut count: u32 = 0; + // TODO: handle the `--reparent` case here. + tx.repo_mut().transform_descendants( + command.settings(), + resolved_commits.iter().ids().cloned().collect_vec(), + |rewriter| { + let old_id = rewriter.old_commit().id(); + let new_tree = rewritten_commits.get(old_id).unwrap(); + let new_tree_id = new_tree.id().clone(); + count += 1; + let builder = rewriter.rebase(command.settings())?; + builder + .set_tree_id(MergedTreeId::resolved(new_tree_id)) + .write()?; + Ok(()) + }, + )?; + tx.finish(ui, "run: rewrite {count} commits with {shell_command}")?; + + Ok(()) }