ok/jj
1
0
Fork 0
forked from mirrors/jj

backend: make read functions async

The commit backend at Google is cloud-based (and so are the other
backends); it reads and writes commits from/to a server, which stores
them in a database. That makes latency much higher than for disk-based
backends. To reduce the latency, we have a local daemon process that
caches and prefetches objects. There are still many cases where
latency is high, such as when diffing two uncached commits. We can
improve that by changing some of our (jj's) algorithms to read many
objects concurrently from the backend. In the case of tree-diffing, we
can fetch one level (depth) of the tree at a time. There are several
ways of doing that:

 * Make the backend methods `async`
 * Use many threads for reading from the backend
 * Add backend methods for batch reading

I don't think we typically need CPU parallelism, so it's wasteful to
have hundreds of threads running in order to fetch hundreds of objects
in parallel (especially when using a synchronous backend like the Git
backend). Batching would work well for the tree-diffing case, but it's
not as composable as `async`. For example, if we wanted to fetch some
commits at the same time as we were doing a diff, it's hard to see how
to do that with batching. Using async seems like our best bet.

I didn't make the backend interface's write functions async because
writes are already async with the daemon we have at Google. That
daemon will hash the object and immediately return, and then send the
object to the server in the background. I think any cloud-based
solution will need a similar daemon process. However, we may need to
reconsider this if/when jj gets used on a server with a custom backend
that writes directly to a database (i.e. no async daemon in between).

I've tried to measure the performance impact. That's the largest
difference I've been able to measure was on `jj diff
--ignore-working-copy -s --from v5.0 --to v6.0` in the Linux repo,
which increases from 749 ms to 773 ms (3.3%). In most cases I've
tested, there's no measurable difference. I've tried diffing from the
root commit, as well as `jj --ignore-working-copy log --no-graph -r
'::v3.0 & author(torvalds)' -T 'commit_id ++ "\n"'` (to test a
commit-heavy load).
This commit is contained in:
Martin von Zweigbergk 2023-09-06 12:59:17 -07:00 committed by Martin von Zweigbergk
parent bd5eef9c5e
commit 5174489959
11 changed files with 131 additions and 70 deletions

4
Cargo.lock generated
View file

@ -990,6 +990,7 @@ dependencies = [
"anyhow", "anyhow",
"assert_cmd", "assert_cmd",
"assert_matches", "assert_matches",
"async-trait",
"cargo_metadata", "cargo_metadata",
"chrono", "chrono",
"clap", "clap",
@ -1036,6 +1037,7 @@ name = "jj-lib"
version = "0.10.0" version = "0.10.0"
dependencies = [ dependencies = [
"assert_matches", "assert_matches",
"async-trait",
"backoff", "backoff",
"blake2", "blake2",
"byteorder", "byteorder",
@ -1046,6 +1048,7 @@ dependencies = [
"digest", "digest",
"either", "either",
"esl01-renderdag", "esl01-renderdag",
"futures 0.3.28",
"git2", "git2",
"hex", "hex",
"insta", "insta",
@ -2083,6 +2086,7 @@ dependencies = [
name = "testutils" name = "testutils"
version = "0.10.0" version = "0.10.0"
dependencies = [ dependencies = [
"async-trait",
"config", "config",
"git2", "git2",
"itertools 0.11.0", "itertools 0.11.0",

View file

@ -20,6 +20,7 @@ keywords = ["VCS", "DVCS", "SCM", "Git", "Mercurial"]
anyhow = "1.0.75" anyhow = "1.0.75"
assert_cmd = "2.0.8" assert_cmd = "2.0.8"
assert_matches = "1.5.0" assert_matches = "1.5.0"
async-trait = "0.1.73"
backoff = "0.4.0" backoff = "0.4.0"
blake2 = "0.10.6" blake2 = "0.10.6"
byteorder = "1.5.0" byteorder = "1.5.0"
@ -39,6 +40,7 @@ digest = "0.10.7"
dirs = "5.0.1" dirs = "5.0.1"
either = "1.9.0" either = "1.9.0"
esl01-renderdag = "0.3.0" esl01-renderdag = "0.3.0"
futures = "0.3.28"
glob = "0.3.1" glob = "0.3.1"
git2 = "0.17.2" git2 = "0.17.2"
hex = "0.4.3" hex = "0.4.3"

View file

@ -69,6 +69,7 @@ libc = { workspace = true }
[dev-dependencies] [dev-dependencies]
anyhow = { workspace = true } anyhow = { workspace = true }
async-trait = { workspace = true }
assert_cmd = { workspace = true } assert_cmd = { workspace = true }
assert_matches = { workspace = true } assert_matches = { workspace = true }
insta = { workspace = true } insta = { workspace = true }

View file

@ -16,6 +16,7 @@ use std::any::Any;
use std::io::Read; use std::io::Read;
use std::path::Path; use std::path::Path;
use async_trait::async_trait;
use jj_cli::cli_util::{CliRunner, CommandError, CommandHelper}; use jj_cli::cli_util::{CliRunner, CommandError, CommandHelper};
use jj_cli::ui::Ui; use jj_cli::ui::Ui;
use jj_lib::backend::{ use jj_lib::backend::{
@ -86,6 +87,7 @@ impl JitBackend {
} }
} }
#[async_trait]
impl Backend for JitBackend { impl Backend for JitBackend {
fn as_any(&self) -> &dyn Any { fn as_any(&self) -> &dyn Any {
self self
@ -115,40 +117,40 @@ impl Backend for JitBackend {
self.inner.empty_tree_id() self.inner.empty_tree_id()
} }
fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult<Box<dyn Read>> { async fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult<Box<dyn Read>> {
self.inner.read_file(path, id) self.inner.read_file(path, id).await
} }
fn write_file(&self, path: &RepoPath, contents: &mut dyn Read) -> BackendResult<FileId> { fn write_file(&self, path: &RepoPath, contents: &mut dyn Read) -> BackendResult<FileId> {
self.inner.write_file(path, contents) self.inner.write_file(path, contents)
} }
fn read_symlink(&self, path: &RepoPath, id: &SymlinkId) -> BackendResult<String> { async fn read_symlink(&self, path: &RepoPath, id: &SymlinkId) -> BackendResult<String> {
self.inner.read_symlink(path, id) self.inner.read_symlink(path, id).await
} }
fn write_symlink(&self, path: &RepoPath, target: &str) -> BackendResult<SymlinkId> { fn write_symlink(&self, path: &RepoPath, target: &str) -> BackendResult<SymlinkId> {
self.inner.write_symlink(path, target) self.inner.write_symlink(path, target)
} }
fn read_tree(&self, path: &RepoPath, id: &TreeId) -> BackendResult<Tree> { async fn read_tree(&self, path: &RepoPath, id: &TreeId) -> BackendResult<Tree> {
self.inner.read_tree(path, id) self.inner.read_tree(path, id).await
} }
fn write_tree(&self, path: &RepoPath, contents: &Tree) -> BackendResult<TreeId> { fn write_tree(&self, path: &RepoPath, contents: &Tree) -> BackendResult<TreeId> {
self.inner.write_tree(path, contents) self.inner.write_tree(path, contents)
} }
fn read_conflict(&self, path: &RepoPath, id: &ConflictId) -> BackendResult<Conflict> { async fn read_conflict(&self, path: &RepoPath, id: &ConflictId) -> BackendResult<Conflict> {
self.inner.read_conflict(path, id) self.inner.read_conflict(path, id).await
} }
fn write_conflict(&self, path: &RepoPath, contents: &Conflict) -> BackendResult<ConflictId> { fn write_conflict(&self, path: &RepoPath, contents: &Conflict) -> BackendResult<ConflictId> {
self.inner.write_conflict(path, contents) self.inner.write_conflict(path, contents)
} }
fn read_commit(&self, id: &CommitId) -> BackendResult<Commit> { async fn read_commit(&self, id: &CommitId) -> BackendResult<Commit> {
self.inner.read_commit(id) self.inner.read_commit(id).await
} }
fn write_commit(&self, contents: Commit) -> BackendResult<(CommitId, Commit)> { fn write_commit(&self, contents: Commit) -> BackendResult<(CommitId, Commit)> {

View file

@ -19,6 +19,7 @@ harness = false
version_check = { workspace = true } version_check = { workspace = true }
[dependencies] [dependencies]
async-trait = { workspace = true}
backoff = { workspace = true } backoff = { workspace = true }
blake2 = { workspace = true } blake2 = { workspace = true }
byteorder = { workspace = true } byteorder = { workspace = true }
@ -26,6 +27,7 @@ bytes = { workspace = true }
chrono = { workspace = true } chrono = { workspace = true }
config = { workspace = true } config = { workspace = true }
digest = { workspace = true } digest = { workspace = true }
futures = { workspace = true }
either = { workspace = true } either = { workspace = true }
git2 = { workspace = true } git2 = { workspace = true }
hex = { workspace = true } hex = { workspace = true }
@ -63,6 +65,7 @@ num_cpus = { workspace = true }
pretty_assertions = { workspace = true } pretty_assertions = { workspace = true }
test-case = { workspace = true } test-case = { workspace = true }
testutils = { workspace = true } testutils = { workspace = true }
tokio = { workspace = true, features = ["full"] }
[features] [features]
default = [] default = []

View file

@ -21,6 +21,7 @@ use std::io::Read;
use std::result::Result; use std::result::Result;
use std::vec::Vec; use std::vec::Vec;
use async_trait::async_trait;
use thiserror::Error; use thiserror::Error;
use crate::content_hash::ContentHash; use crate::content_hash::ContentHash;
@ -465,6 +466,7 @@ pub fn make_root_commit(root_change_id: ChangeId, empty_tree_id: TreeId) -> Comm
} }
} }
#[async_trait]
pub trait Backend: Send + Sync + Debug { pub trait Backend: Send + Sync + Debug {
fn as_any(&self) -> &dyn Any; fn as_any(&self) -> &dyn Any;
@ -484,23 +486,23 @@ pub trait Backend: Send + Sync + Debug {
fn empty_tree_id(&self) -> &TreeId; fn empty_tree_id(&self) -> &TreeId;
fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult<Box<dyn Read>>; async fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult<Box<dyn Read>>;
fn write_file(&self, path: &RepoPath, contents: &mut dyn Read) -> BackendResult<FileId>; fn write_file(&self, path: &RepoPath, contents: &mut dyn Read) -> BackendResult<FileId>;
fn read_symlink(&self, path: &RepoPath, id: &SymlinkId) -> BackendResult<String>; async fn read_symlink(&self, path: &RepoPath, id: &SymlinkId) -> BackendResult<String>;
fn write_symlink(&self, path: &RepoPath, target: &str) -> BackendResult<SymlinkId>; fn write_symlink(&self, path: &RepoPath, target: &str) -> BackendResult<SymlinkId>;
fn read_tree(&self, path: &RepoPath, id: &TreeId) -> BackendResult<Tree>; async fn read_tree(&self, path: &RepoPath, id: &TreeId) -> BackendResult<Tree>;
fn write_tree(&self, path: &RepoPath, contents: &Tree) -> BackendResult<TreeId>; fn write_tree(&self, path: &RepoPath, contents: &Tree) -> BackendResult<TreeId>;
fn read_conflict(&self, path: &RepoPath, id: &ConflictId) -> BackendResult<Conflict>; async fn read_conflict(&self, path: &RepoPath, id: &ConflictId) -> BackendResult<Conflict>;
fn write_conflict(&self, path: &RepoPath, contents: &Conflict) -> BackendResult<ConflictId>; fn write_conflict(&self, path: &RepoPath, contents: &Conflict) -> BackendResult<ConflictId>;
fn read_commit(&self, id: &CommitId) -> BackendResult<Commit>; async fn read_commit(&self, id: &CommitId) -> BackendResult<Commit>;
/// Writes a commit and returns its ID and the commit itself. The commit /// Writes a commit and returns its ID and the commit itself. The commit
/// should contain the data that was actually written, which may differ /// should contain the data that was actually written, which may differ

View file

@ -22,6 +22,7 @@ use std::ops::Deref;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, MutexGuard}; use std::sync::{Arc, Mutex, MutexGuard};
use async_trait::async_trait;
use git2::Oid; use git2::Oid;
use itertools::Itertools; use itertools::Itertools;
use prost::Message; use prost::Message;
@ -479,6 +480,7 @@ impl Debug for GitBackend {
} }
} }
#[async_trait]
impl Backend for GitBackend { impl Backend for GitBackend {
fn as_any(&self) -> &dyn Any { fn as_any(&self) -> &dyn Any {
self self
@ -508,7 +510,7 @@ impl Backend for GitBackend {
&self.empty_tree_id &self.empty_tree_id
} }
fn read_file(&self, _path: &RepoPath, id: &FileId) -> BackendResult<Box<dyn Read>> { async fn read_file(&self, _path: &RepoPath, id: &FileId) -> BackendResult<Box<dyn Read>> {
let git_blob_id = validate_git_object_id(id)?; let git_blob_id = validate_git_object_id(id)?;
let locked_repo = self.repo.lock().unwrap(); let locked_repo = self.repo.lock().unwrap();
let blob = locked_repo let blob = locked_repo
@ -531,7 +533,7 @@ impl Backend for GitBackend {
Ok(FileId::new(oid.as_bytes().to_vec())) Ok(FileId::new(oid.as_bytes().to_vec()))
} }
fn read_symlink(&self, _path: &RepoPath, id: &SymlinkId) -> Result<String, BackendError> { async fn read_symlink(&self, _path: &RepoPath, id: &SymlinkId) -> Result<String, BackendError> {
let git_blob_id = validate_git_object_id(id)?; let git_blob_id = validate_git_object_id(id)?;
let locked_repo = self.repo.lock().unwrap(); let locked_repo = self.repo.lock().unwrap();
let blob = locked_repo let blob = locked_repo
@ -558,7 +560,7 @@ impl Backend for GitBackend {
Ok(SymlinkId::new(oid.as_bytes().to_vec())) Ok(SymlinkId::new(oid.as_bytes().to_vec()))
} }
fn read_tree(&self, _path: &RepoPath, id: &TreeId) -> BackendResult<Tree> { async fn read_tree(&self, _path: &RepoPath, id: &TreeId) -> BackendResult<Tree> {
if id == &self.empty_tree_id { if id == &self.empty_tree_id {
return Ok(Tree::default()); return Ok(Tree::default());
} }
@ -653,11 +655,13 @@ impl Backend for GitBackend {
Ok(TreeId::from_bytes(oid.as_bytes())) Ok(TreeId::from_bytes(oid.as_bytes()))
} }
fn read_conflict(&self, _path: &RepoPath, id: &ConflictId) -> BackendResult<Conflict> { async fn read_conflict(&self, _path: &RepoPath, id: &ConflictId) -> BackendResult<Conflict> {
let mut file = self.read_file( let mut file = self
&RepoPath::from_internal_string("unused"), .read_file(
&FileId::new(id.to_bytes()), &RepoPath::from_internal_string("unused"),
)?; &FileId::new(id.to_bytes()),
)
.await?;
let mut data = String::new(); let mut data = String::new();
file.read_to_string(&mut data) file.read_to_string(&mut data)
.map_err(|err| BackendError::ReadObject { .map_err(|err| BackendError::ReadObject {
@ -690,7 +694,7 @@ impl Backend for GitBackend {
} }
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
fn read_commit(&self, id: &CommitId) -> BackendResult<Commit> { async fn read_commit(&self, id: &CommitId) -> BackendResult<Commit> {
if *id == self.root_commit_id { if *id == self.root_commit_id {
return Ok(make_root_commit( return Ok(make_root_commit(
self.root_change_id().clone(), self.root_change_id().clone(),
@ -1005,7 +1009,7 @@ mod tests {
.collect_vec(); .collect_vec();
assert_eq!(git_refs, vec![git_commit_id2]); assert_eq!(git_refs, vec![git_commit_id2]);
let commit = backend.read_commit(&commit_id).unwrap(); let commit = futures::executor::block_on(backend.read_commit(&commit_id)).unwrap();
assert_eq!(&commit.change_id, &change_id); assert_eq!(&commit.change_id, &change_id);
assert_eq!(commit.parents, vec![CommitId::from_bytes(&[0; 20])]); assert_eq!(commit.parents, vec![CommitId::from_bytes(&[0; 20])]);
assert_eq!(commit.predecessors, vec![]); assert_eq!(commit.predecessors, vec![]);
@ -1034,12 +1038,11 @@ mod tests {
); );
assert_eq!(commit.committer.timestamp.tz_offset, -480); assert_eq!(commit.committer.timestamp.tz_offset, -480);
let root_tree = backend let root_tree = futures::executor::block_on(backend.read_tree(
.read_tree( &RepoPath::root(),
&RepoPath::root(), &TreeId::from_bytes(root_tree_id.as_bytes()),
&TreeId::from_bytes(root_tree_id.as_bytes()), ))
) .unwrap();
.unwrap();
let mut root_entries = root_tree.entries(); let mut root_entries = root_tree.entries();
let dir = root_entries.next().unwrap(); let dir = root_entries.next().unwrap();
assert_eq!(root_entries.next(), None); assert_eq!(root_entries.next(), None);
@ -1049,12 +1052,11 @@ mod tests {
&TreeValue::Tree(TreeId::from_bytes(dir_tree_id.as_bytes())) &TreeValue::Tree(TreeId::from_bytes(dir_tree_id.as_bytes()))
); );
let dir_tree = backend let dir_tree = futures::executor::block_on(backend.read_tree(
.read_tree( &RepoPath::from_internal_string("dir"),
&RepoPath::from_internal_string("dir"), &TreeId::from_bytes(dir_tree_id.as_bytes()),
&TreeId::from_bytes(dir_tree_id.as_bytes()), ))
) .unwrap();
.unwrap();
let mut entries = dir_tree.entries(); let mut entries = dir_tree.entries();
let file = entries.next().unwrap(); let file = entries.next().unwrap();
let symlink = entries.next().unwrap(); let symlink = entries.next().unwrap();
@ -1073,7 +1075,7 @@ mod tests {
&TreeValue::Symlink(SymlinkId::from_bytes(blob2.as_bytes())) &TreeValue::Symlink(SymlinkId::from_bytes(blob2.as_bytes()))
); );
let commit2 = backend.read_commit(&commit_id2).unwrap(); let commit2 = futures::executor::block_on(backend.read_commit(&commit_id2)).unwrap();
assert_eq!(commit2.parents, vec![commit_id.clone()]); assert_eq!(commit2.parents, vec![commit_id.clone()]);
assert_eq!(commit.predecessors, vec![]); assert_eq!(commit.predecessors, vec![]);
assert_eq!( assert_eq!(
@ -1112,9 +1114,10 @@ mod tests {
// read_commit() without import_head_commits() works as of now. This might be // read_commit() without import_head_commits() works as of now. This might be
// changed later. // changed later.
assert!(backend assert!(futures::executor::block_on(
.read_commit(&CommitId::from_bytes(git_commit_id.as_bytes())) backend.read_commit(&CommitId::from_bytes(git_commit_id.as_bytes()))
.is_ok()); )
.is_ok());
assert!( assert!(
backend backend
.cached_extra_metadata_table() .cached_extra_metadata_table()
@ -1202,7 +1205,7 @@ mod tests {
// Only root commit as parent // Only root commit as parent
commit.parents = vec![backend.root_commit_id().clone()]; commit.parents = vec![backend.root_commit_id().clone()];
let first_id = backend.write_commit(commit.clone()).unwrap().0; let first_id = backend.write_commit(commit.clone()).unwrap().0;
let first_commit = backend.read_commit(&first_id).unwrap(); let first_commit = futures::executor::block_on(backend.read_commit(&first_id)).unwrap();
assert_eq!(first_commit, commit); assert_eq!(first_commit, commit);
let first_git_commit = git_repo.find_commit(git_id(&first_id)).unwrap(); let first_git_commit = git_repo.find_commit(git_id(&first_id)).unwrap();
assert_eq!(first_git_commit.parent_ids().collect_vec(), vec![]); assert_eq!(first_git_commit.parent_ids().collect_vec(), vec![]);
@ -1210,7 +1213,7 @@ mod tests {
// Only non-root commit as parent // Only non-root commit as parent
commit.parents = vec![first_id.clone()]; commit.parents = vec![first_id.clone()];
let second_id = backend.write_commit(commit.clone()).unwrap().0; let second_id = backend.write_commit(commit.clone()).unwrap().0;
let second_commit = backend.read_commit(&second_id).unwrap(); let second_commit = futures::executor::block_on(backend.read_commit(&second_id)).unwrap();
assert_eq!(second_commit, commit); assert_eq!(second_commit, commit);
let second_git_commit = git_repo.find_commit(git_id(&second_id)).unwrap(); let second_git_commit = git_repo.find_commit(git_id(&second_id)).unwrap();
assert_eq!( assert_eq!(
@ -1221,7 +1224,7 @@ mod tests {
// Merge commit // Merge commit
commit.parents = vec![first_id.clone(), second_id.clone()]; commit.parents = vec![first_id.clone(), second_id.clone()];
let merge_id = backend.write_commit(commit.clone()).unwrap().0; let merge_id = backend.write_commit(commit.clone()).unwrap().0;
let merge_commit = backend.read_commit(&merge_id).unwrap(); let merge_commit = futures::executor::block_on(backend.read_commit(&merge_id)).unwrap();
assert_eq!(merge_commit, commit); assert_eq!(merge_commit, commit);
let merge_git_commit = git_repo.find_commit(git_id(&merge_id)).unwrap(); let merge_git_commit = git_repo.find_commit(git_id(&merge_id)).unwrap();
assert_eq!( assert_eq!(
@ -1271,7 +1274,8 @@ mod tests {
// When writing a tree-level conflict, the root tree on the git side has the // When writing a tree-level conflict, the root tree on the git side has the
// individual trees as subtrees. // individual trees as subtrees.
let read_commit_id = backend.write_commit(commit.clone()).unwrap().0; let read_commit_id = backend.write_commit(commit.clone()).unwrap().0;
let read_commit = backend.read_commit(&read_commit_id).unwrap(); let read_commit =
futures::executor::block_on(backend.read_commit(&read_commit_id)).unwrap();
assert_eq!(read_commit, commit); assert_eq!(read_commit, commit);
let git_commit = git_repo let git_commit = git_repo
.find_commit(Oid::from_bytes(read_commit_id.as_bytes()).unwrap()) .find_commit(Oid::from_bytes(read_commit_id.as_bytes()).unwrap())
@ -1300,7 +1304,8 @@ mod tests {
// regular git tree. // regular git tree.
commit.root_tree = MergedTreeId::resolved(create_tree(5)); commit.root_tree = MergedTreeId::resolved(create_tree(5));
let read_commit_id = backend.write_commit(commit.clone()).unwrap().0; let read_commit_id = backend.write_commit(commit.clone()).unwrap().0;
let read_commit = backend.read_commit(&read_commit_id).unwrap(); let read_commit =
futures::executor::block_on(backend.read_commit(&read_commit_id)).unwrap();
assert_eq!(read_commit, commit); assert_eq!(read_commit, commit);
let git_commit = git_repo let git_commit = git_repo
.find_commit(Oid::from_bytes(read_commit_id.as_bytes()).unwrap()) .find_commit(Oid::from_bytes(read_commit_id.as_bytes()).unwrap())
@ -1365,7 +1370,10 @@ mod tests {
// committer timestamp of the commit it actually writes. // committer timestamp of the commit it actually writes.
let (commit_id2, mut actual_commit2) = backend.write_commit(commit2.clone()).unwrap(); let (commit_id2, mut actual_commit2) = backend.write_commit(commit2.clone()).unwrap();
// The returned matches the ID // The returned matches the ID
assert_eq!(backend.read_commit(&commit_id2).unwrap(), actual_commit2); assert_eq!(
futures::executor::block_on(backend.read_commit(&commit_id2)).unwrap(),
actual_commit2
);
assert_ne!(commit_id2, commit_id1); assert_ne!(commit_id2, commit_id1);
// The committer timestamp should differ // The committer timestamp should differ
assert_ne!( assert_ne!(

View file

@ -21,6 +21,7 @@ use std::fs::File;
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use async_trait::async_trait;
use blake2::{Blake2b512, Digest}; use blake2::{Blake2b512, Digest};
use prost::Message; use prost::Message;
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
@ -114,6 +115,7 @@ impl LocalBackend {
} }
} }
#[async_trait]
impl Backend for LocalBackend { impl Backend for LocalBackend {
fn as_any(&self) -> &dyn Any { fn as_any(&self) -> &dyn Any {
self self
@ -143,7 +145,7 @@ impl Backend for LocalBackend {
&self.empty_tree_id &self.empty_tree_id
} }
fn read_file(&self, _path: &RepoPath, id: &FileId) -> BackendResult<Box<dyn Read>> { async fn read_file(&self, _path: &RepoPath, id: &FileId) -> BackendResult<Box<dyn Read>> {
let path = self.file_path(id); let path = self.file_path(id);
let file = File::open(path).map_err(|err| map_not_found_err(err, id))?; let file = File::open(path).map_err(|err| map_not_found_err(err, id))?;
Ok(Box::new(zstd::Decoder::new(file).map_err(to_other_err)?)) Ok(Box::new(zstd::Decoder::new(file).map_err(to_other_err)?))
@ -171,7 +173,7 @@ impl Backend for LocalBackend {
Ok(id) Ok(id)
} }
fn read_symlink(&self, _path: &RepoPath, id: &SymlinkId) -> Result<String, BackendError> { async fn read_symlink(&self, _path: &RepoPath, id: &SymlinkId) -> Result<String, BackendError> {
let path = self.symlink_path(id); let path = self.symlink_path(id);
let target = fs::read_to_string(path).map_err(|err| map_not_found_err(err, id))?; let target = fs::read_to_string(path).map_err(|err| map_not_found_err(err, id))?;
Ok(target) Ok(target)
@ -191,7 +193,7 @@ impl Backend for LocalBackend {
Ok(id) Ok(id)
} }
fn read_tree(&self, _path: &RepoPath, id: &TreeId) -> BackendResult<Tree> { async fn read_tree(&self, _path: &RepoPath, id: &TreeId) -> BackendResult<Tree> {
let path = self.tree_path(id); let path = self.tree_path(id);
let buf = fs::read(path).map_err(|err| map_not_found_err(err, id))?; let buf = fs::read(path).map_err(|err| map_not_found_err(err, id))?;
@ -215,7 +217,7 @@ impl Backend for LocalBackend {
Ok(id) Ok(id)
} }
fn read_conflict(&self, _path: &RepoPath, id: &ConflictId) -> BackendResult<Conflict> { async fn read_conflict(&self, _path: &RepoPath, id: &ConflictId) -> BackendResult<Conflict> {
let path = self.conflict_path(id); let path = self.conflict_path(id);
let buf = fs::read(path).map_err(|err| map_not_found_err(err, id))?; let buf = fs::read(path).map_err(|err| map_not_found_err(err, id))?;
@ -239,7 +241,7 @@ impl Backend for LocalBackend {
Ok(id) Ok(id)
} }
fn read_commit(&self, id: &CommitId) -> BackendResult<Commit> { async fn read_commit(&self, id: &CommitId) -> BackendResult<Commit> {
if *id == self.root_commit_id { if *id == self.root_commit_id {
return Ok(make_root_commit( return Ok(make_root_commit(
self.root_change_id().clone(), self.root_change_id().clone(),
@ -486,25 +488,26 @@ mod tests {
// Only root commit as parent // Only root commit as parent
commit.parents = vec![backend.root_commit_id().clone()]; commit.parents = vec![backend.root_commit_id().clone()];
let first_id = backend.write_commit(commit.clone()).unwrap().0; let first_id = backend.write_commit(commit.clone()).unwrap().0;
let first_commit = backend.read_commit(&first_id).unwrap(); let first_commit = futures::executor::block_on(backend.read_commit(&first_id)).unwrap();
assert_eq!(first_commit, commit); assert_eq!(first_commit, commit);
// Only non-root commit as parent // Only non-root commit as parent
commit.parents = vec![first_id.clone()]; commit.parents = vec![first_id.clone()];
let second_id = backend.write_commit(commit.clone()).unwrap().0; let second_id = backend.write_commit(commit.clone()).unwrap().0;
let second_commit = backend.read_commit(&second_id).unwrap(); let second_commit = futures::executor::block_on(backend.read_commit(&second_id)).unwrap();
assert_eq!(second_commit, commit); assert_eq!(second_commit, commit);
// Merge commit // Merge commit
commit.parents = vec![first_id.clone(), second_id.clone()]; commit.parents = vec![first_id.clone(), second_id.clone()];
let merge_id = backend.write_commit(commit.clone()).unwrap().0; let merge_id = backend.write_commit(commit.clone()).unwrap().0;
let merge_commit = backend.read_commit(&merge_id).unwrap(); let merge_commit = futures::executor::block_on(backend.read_commit(&merge_id)).unwrap();
assert_eq!(merge_commit, commit); assert_eq!(merge_commit, commit);
// Merge commit with root as one parent // Merge commit with root as one parent
commit.parents = vec![first_id, backend.root_commit_id().clone()]; commit.parents = vec![first_id, backend.root_commit_id().clone()];
let root_merge_id = backend.write_commit(commit.clone()).unwrap().0; let root_merge_id = backend.write_commit(commit.clone()).unwrap().0;
let root_merge_commit = backend.read_commit(&root_merge_id).unwrap(); let root_merge_commit =
futures::executor::block_on(backend.read_commit(&root_merge_id)).unwrap();
assert_eq!(root_merge_commit, commit); assert_eq!(root_merge_commit, commit);
} }

View file

@ -97,18 +97,22 @@ impl Store {
} }
pub fn get_commit(self: &Arc<Self>, id: &CommitId) -> BackendResult<Commit> { pub fn get_commit(self: &Arc<Self>, id: &CommitId) -> BackendResult<Commit> {
let data = self.get_backend_commit(id)?; futures::executor::block_on(self.get_commit_async(id))
}
pub async fn get_commit_async(self: &Arc<Self>, id: &CommitId) -> BackendResult<Commit> {
let data = self.get_backend_commit(id).await?;
Ok(Commit::new(self.clone(), id.clone(), data)) Ok(Commit::new(self.clone(), id.clone(), data))
} }
fn get_backend_commit(&self, id: &CommitId) -> BackendResult<Arc<backend::Commit>> { async fn get_backend_commit(&self, id: &CommitId) -> BackendResult<Arc<backend::Commit>> {
{ {
let read_locked_cached = self.commit_cache.read().unwrap(); let read_locked_cached = self.commit_cache.read().unwrap();
if let Some(data) = read_locked_cached.get(id).cloned() { if let Some(data) = read_locked_cached.get(id).cloned() {
return Ok(data); return Ok(data);
} }
} }
let commit = self.backend.read_commit(id)?; let commit = self.backend.read_commit(id).await?;
let data = Arc::new(commit); let data = Arc::new(commit);
let mut write_locked_cache = self.commit_cache.write().unwrap(); let mut write_locked_cache = self.commit_cache.write().unwrap();
write_locked_cache.insert(id.clone(), data.clone()); write_locked_cache.insert(id.clone(), data.clone());
@ -128,11 +132,23 @@ impl Store {
} }
pub fn get_tree(self: &Arc<Self>, dir: &RepoPath, id: &TreeId) -> BackendResult<Tree> { pub fn get_tree(self: &Arc<Self>, dir: &RepoPath, id: &TreeId) -> BackendResult<Tree> {
let data = self.get_backend_tree(dir, id)?; futures::executor::block_on(self.get_tree_async(dir, id))
}
pub async fn get_tree_async(
self: &Arc<Self>,
dir: &RepoPath,
id: &TreeId,
) -> BackendResult<Tree> {
let data = self.get_backend_tree(dir, id).await?;
Ok(Tree::new(self.clone(), dir.clone(), id.clone(), data)) Ok(Tree::new(self.clone(), dir.clone(), id.clone(), data))
} }
fn get_backend_tree(&self, dir: &RepoPath, id: &TreeId) -> BackendResult<Arc<backend::Tree>> { async fn get_backend_tree(
&self,
dir: &RepoPath,
id: &TreeId,
) -> BackendResult<Arc<backend::Tree>> {
let key = (dir.clone(), id.clone()); let key = (dir.clone(), id.clone());
{ {
let read_locked_cache = self.tree_cache.read().unwrap(); let read_locked_cache = self.tree_cache.read().unwrap();
@ -140,7 +156,8 @@ impl Store {
return Ok(data); return Ok(data);
} }
} }
let data = Arc::new(self.backend.read_tree(dir, id)?); let data = self.backend.read_tree(dir, id).await?;
let data = Arc::new(data);
let mut write_locked_cache = self.tree_cache.write().unwrap(); let mut write_locked_cache = self.tree_cache.write().unwrap();
write_locked_cache.insert(key, data.clone()); write_locked_cache.insert(key, data.clone());
Ok(data) Ok(data)
@ -175,7 +192,15 @@ impl Store {
} }
pub fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult<Box<dyn Read>> { pub fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult<Box<dyn Read>> {
self.backend.read_file(path, id) futures::executor::block_on(self.read_file_async(path, id))
}
pub async fn read_file_async(
&self,
path: &RepoPath,
id: &FileId,
) -> BackendResult<Box<dyn Read>> {
self.backend.read_file(path, id).await
} }
pub fn write_file(&self, path: &RepoPath, contents: &mut dyn Read) -> BackendResult<FileId> { pub fn write_file(&self, path: &RepoPath, contents: &mut dyn Read) -> BackendResult<FileId> {
@ -183,7 +208,15 @@ impl Store {
} }
pub fn read_symlink(&self, path: &RepoPath, id: &SymlinkId) -> BackendResult<String> { pub fn read_symlink(&self, path: &RepoPath, id: &SymlinkId) -> BackendResult<String> {
self.backend.read_symlink(path, id) futures::executor::block_on(self.read_symlink_async(path, id))
}
pub async fn read_symlink_async(
&self,
path: &RepoPath,
id: &SymlinkId,
) -> BackendResult<String> {
self.backend.read_symlink(path, id).await
} }
pub fn write_symlink(&self, path: &RepoPath, contents: &str) -> BackendResult<SymlinkId> { pub fn write_symlink(&self, path: &RepoPath, contents: &str) -> BackendResult<SymlinkId> {
@ -195,7 +228,7 @@ impl Store {
path: &RepoPath, path: &RepoPath,
id: &ConflictId, id: &ConflictId,
) -> BackendResult<Merge<Option<TreeValue>>> { ) -> BackendResult<Merge<Option<TreeValue>>> {
let backend_conflict = self.backend.read_conflict(path, id)?; let backend_conflict = futures::executor::block_on(self.backend.read_conflict(path, id))?;
Ok(Merge::from_backend_conflict(backend_conflict)) Ok(Merge::from_backend_conflict(backend_conflict))
} }

View file

@ -15,6 +15,7 @@ readme = { workspace = true }
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
async-trait = { workspace = true }
config = { workspace = true } config = { workspace = true }
git2 = { workspace = true } git2 = { workspace = true }
itertools = { workspace = true } itertools = { workspace = true }

View file

@ -19,6 +19,7 @@ use std::io::{Cursor, Read};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, MutexGuard, OnceLock}; use std::sync::{Arc, Mutex, MutexGuard, OnceLock};
use async_trait::async_trait;
use jj_lib::backend::{ use jj_lib::backend::{
make_root_commit, Backend, BackendError, BackendResult, ChangeId, Commit, CommitId, Conflict, make_root_commit, Backend, BackendError, BackendResult, ChangeId, Commit, CommitId, Conflict,
ConflictId, FileId, ObjectId, SymlinkId, Tree, TreeId, ConflictId, FileId, ObjectId, SymlinkId, Tree, TreeId,
@ -107,6 +108,7 @@ impl Debug for TestBackend {
} }
} }
#[async_trait]
impl Backend for TestBackend { impl Backend for TestBackend {
fn as_any(&self) -> &dyn Any { fn as_any(&self) -> &dyn Any {
self self
@ -136,7 +138,7 @@ impl Backend for TestBackend {
&self.empty_tree_id &self.empty_tree_id
} }
fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult<Box<dyn Read>> { async fn read_file(&self, path: &RepoPath, id: &FileId) -> BackendResult<Box<dyn Read>> {
match self match self
.locked_data() .locked_data()
.files .files
@ -165,7 +167,7 @@ impl Backend for TestBackend {
Ok(id) Ok(id)
} }
fn read_symlink(&self, path: &RepoPath, id: &SymlinkId) -> Result<String, BackendError> { async fn read_symlink(&self, path: &RepoPath, id: &SymlinkId) -> Result<String, BackendError> {
match self match self
.locked_data() .locked_data()
.symlinks .symlinks
@ -192,7 +194,7 @@ impl Backend for TestBackend {
Ok(id) Ok(id)
} }
fn read_tree(&self, path: &RepoPath, id: &TreeId) -> BackendResult<Tree> { async fn read_tree(&self, path: &RepoPath, id: &TreeId) -> BackendResult<Tree> {
if id == &self.empty_tree_id { if id == &self.empty_tree_id {
return Ok(Tree::default()); return Ok(Tree::default());
} }
@ -222,7 +224,7 @@ impl Backend for TestBackend {
Ok(id) Ok(id)
} }
fn read_conflict(&self, path: &RepoPath, id: &ConflictId) -> BackendResult<Conflict> { async fn read_conflict(&self, path: &RepoPath, id: &ConflictId) -> BackendResult<Conflict> {
match self match self
.locked_data() .locked_data()
.conflicts .conflicts
@ -249,7 +251,7 @@ impl Backend for TestBackend {
Ok(id) Ok(id)
} }
fn read_commit(&self, id: &CommitId) -> BackendResult<Commit> { async fn read_commit(&self, id: &CommitId) -> BackendResult<Commit> {
if id == &self.root_commit_id { if id == &self.root_commit_id {
return Ok(make_root_commit( return Ok(make_root_commit(
self.root_change_id.clone(), self.root_change_id.clone(),