ok/jj
1
0
Fork 0
forked from mirrors/jj

op_heads_store: refactor into an interface and simple implemenation

The implementation has some hoops to jump through because Rust does not allow
`self: &Arc<Self>` on trait methods, and two of the OpHeadsStore functions need
to return cloned selves. This is worked around by making the implementation type
itself a wrapper around Arc<>.

This is not particularly note worthy for the current implementation type where
the only data copied is a PathBuf, but for extensions it is likely to be more
critical that the lifetime management of the OpHeadsStore is properly
maintained.
This commit is contained in:
Daniel Ploch 2022-12-15 17:13:00 -05:00 committed by Daniel Ploch
parent af85f552b6
commit 309a3f91a1
5 changed files with 271 additions and 164 deletions

View file

@ -48,6 +48,7 @@ pub mod revset;
pub mod revset_graph_iterator;
pub mod rewrite;
pub mod settings;
pub mod simple_op_heads_store;
pub mod simple_op_store;
#[cfg(feature = "legacy-thrift")]
mod simple_op_store_model;

View file

@ -12,24 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::path::PathBuf;
use std::fmt::Debug;
use std::sync::Arc;
use itertools::Itertools;
use thiserror::Error;
use crate::lock::FileLock;
use crate::op_store::{OpStore, OperationId, OperationMetadata};
use crate::op_store::{OpStore, OperationId};
use crate::operation::Operation;
use crate::{dag_walk, op_store};
/// Manages the very set of current heads of the operation log. The store is
/// simply a directory where each operation id is a file with that name (and no
/// content).
pub struct OpHeadsStore {
dir: PathBuf,
}
pub enum OpHeads {
/// There's a single latest operation. This is the normal case.
@ -48,157 +37,39 @@ pub enum OpHeadResolutionError {
NoHeads,
}
pub trait LockedOpHeadsResolver {
fn finish(&self, new_op: &Operation);
}
// Represents a mutually exclusive lock on the OpHeadsStore in local systems.
pub struct LockedOpHeads {
store: Arc<OpHeadsStore>,
_lock: FileLock,
resolver: Box<dyn LockedOpHeadsResolver>,
}
impl LockedOpHeads {
pub fn new(resolver: Box<dyn LockedOpHeadsResolver>) -> Self {
LockedOpHeads { resolver }
}
pub fn finish(self, new_op: &Operation) {
self.store.add_op_head(new_op.id());
for old_id in new_op.parent_ids() {
self.store.remove_op_head(old_id);
}
self.resolver.finish(new_op);
}
}
impl OpHeadsStore {
pub fn init(
dir: PathBuf,
op_store: &Arc<dyn OpStore>,
root_view: &op_store::View,
operation_metadata: OperationMetadata,
) -> (Self, Operation) {
let root_view_id = op_store.write_view(root_view).unwrap();
let init_operation = op_store::Operation {
view_id: root_view_id,
parents: vec![],
metadata: operation_metadata,
};
let init_operation_id = op_store.write_operation(&init_operation).unwrap();
let init_operation = Operation::new(op_store.clone(), init_operation_id, init_operation);
/// Manages the very set of current heads of the operation log.
///
/// Implementations should use Arc<> internally, as the lock() and
/// get_heads() return values which might outlive the original object. When Rust
/// makes it possible for a Trait method to reference &Arc<Self>, this can be
/// simplified.
pub trait OpHeadsStore: Send + Sync + Debug {
fn add_op_head(&self, id: &OperationId);
let op_heads_store = OpHeadsStore { dir };
op_heads_store.add_op_head(init_operation.id());
(op_heads_store, init_operation)
}
fn remove_op_head(&self, id: &OperationId);
pub fn load(dir: PathBuf) -> OpHeadsStore {
OpHeadsStore { dir }
}
fn get_op_heads(&self) -> Vec<OperationId>;
fn add_op_head(&self, id: &OperationId) {
std::fs::write(self.dir.join(id.hex()), "").unwrap();
}
fn lock(&self) -> LockedOpHeads;
fn remove_op_head(&self, id: &OperationId) {
// It's fine if the old head was not found. It probably means
// that we're on a distributed file system where the locking
// doesn't work. We'll probably end up with two current
// heads. We'll detect that next time we load the view.
std::fs::remove_file(self.dir.join(id.hex())).ok();
}
pub fn get_op_heads(&self) -> Vec<OperationId> {
let mut op_heads = vec![];
for op_head_entry in std::fs::read_dir(&self.dir).unwrap() {
let op_head_file_name = op_head_entry.unwrap().file_name();
let op_head_file_name = op_head_file_name.to_str().unwrap();
if let Ok(op_head) = hex::decode(op_head_file_name) {
op_heads.push(OperationId::new(op_head));
}
}
op_heads
}
pub fn lock(self: &Arc<Self>) -> LockedOpHeads {
let lock = FileLock::lock(self.dir.join("lock"));
LockedOpHeads {
store: self.clone(),
_lock: lock,
}
}
pub fn get_heads(
self: &Arc<Self>,
op_store: &Arc<dyn OpStore>,
) -> Result<OpHeads, OpHeadResolutionError> {
let mut op_heads = self.get_op_heads();
if op_heads.is_empty() {
return Err(OpHeadResolutionError::NoHeads);
}
if op_heads.len() == 1 {
let operation_id = op_heads.pop().unwrap();
let operation = op_store.read_operation(&operation_id).unwrap();
return Ok(OpHeads::Single(Operation::new(
op_store.clone(),
operation_id,
operation,
)));
}
// There are multiple heads. We take a lock, then check if there are still
// multiple heads (it's likely that another process was in the process of
// deleting on of them). If there are still multiple heads, we attempt to
// merge all the views into one. We then write that view and a corresponding
// operation to the op-store.
// Note that the locking isn't necessary for correctness; we take the lock
// only to prevent other concurrent processes from doing the same work (and
// producing another set of divergent heads).
let locked_op_heads = self.lock();
let op_head_ids = self.get_op_heads();
if op_head_ids.is_empty() {
return Err(OpHeadResolutionError::NoHeads);
}
if op_head_ids.len() == 1 {
let op_head_id = op_head_ids[0].clone();
let op_head = op_store.read_operation(&op_head_id).unwrap();
// Return early so we don't write a merge operation with a single parent
return Ok(OpHeads::Single(Operation::new(
op_store.clone(),
op_head_id,
op_head,
)));
}
let op_heads = op_head_ids
.iter()
.map(|op_id: &OperationId| {
let data = op_store.read_operation(op_id).unwrap();
Operation::new(op_store.clone(), op_id.clone(), data)
})
.collect_vec();
let mut op_heads = self.handle_ancestor_ops(op_heads);
// Return without creating a merge operation
if op_heads.len() == 1 {
return Ok(OpHeads::Single(op_heads.pop().unwrap()));
}
op_heads.sort_by_key(|op| op.store_operation().metadata.end_time.timestamp.clone());
Ok(OpHeads::Unresolved {
locked_op_heads,
op_heads,
})
}
/// Removes operations in the input that are ancestors of other operations
/// in the input. The ancestors are removed both from the list and from
/// disk.
fn handle_ancestor_ops(&self, op_heads: Vec<Operation>) -> Vec<Operation> {
let op_head_ids_before: HashSet<_> = op_heads.iter().map(|op| op.id().clone()).collect();
let neighbors_fn = |op: &Operation| op.parents();
// Remove ancestors so we don't create merge operation with an operation and its
// ancestor
let op_heads = dag_walk::heads(op_heads, &neighbors_fn, &|op: &Operation| op.id().clone());
let op_head_ids_after: HashSet<_> = op_heads.iter().map(|op| op.id().clone()).collect();
for removed_op_head in op_head_ids_before.difference(&op_head_ids_after) {
self.remove_op_head(removed_op_head);
}
op_heads.into_iter().collect()
}
fn get_heads(&self, op_store: &Arc<dyn OpStore>) -> Result<OpHeads, OpHeadResolutionError>;
}

View file

@ -37,6 +37,7 @@ use crate::op_store::{BranchTarget, OpStore, OperationId, RefTarget, WorkspaceId
use crate::operation::Operation;
use crate::rewrite::DescendantRebaser;
use crate::settings::{RepoSettings, UserSettings};
use crate::simple_op_heads_store::SimpleOpHeadsStore;
use crate::simple_op_store::SimpleOpStore;
use crate::store::Store;
use crate::transaction::Transaction;
@ -92,7 +93,7 @@ pub struct ReadonlyRepo {
repo_path: PathBuf,
store: Arc<Store>,
op_store: Arc<dyn OpStore>,
op_heads_store: Arc<OpHeadsStore>,
op_heads_store: Arc<dyn OpHeadsStore>,
operation: Operation,
settings: RepoSettings,
index_store: Arc<IndexStore>,
@ -148,7 +149,7 @@ impl ReadonlyRepo {
let operation_metadata =
crate::transaction::create_op_metadata(user_settings, "initialize repo".to_string());
let (op_heads_store, init_op) =
OpHeadsStore::init(op_heads_path, &op_store, &root_view, operation_metadata);
SimpleOpHeadsStore::init(op_heads_path, &op_store, &root_view, operation_metadata);
let op_heads_store = Arc::new(op_heads_store);
let index_path = repo_path.join("index");
@ -225,7 +226,7 @@ impl ReadonlyRepo {
&self.op_store
}
pub fn op_heads_store(&self) -> &Arc<OpHeadsStore> {
pub fn op_heads_store(&self) -> &Arc<dyn OpHeadsStore> {
&self.op_heads_store
}
@ -392,7 +393,7 @@ pub struct RepoLoader {
repo_settings: RepoSettings,
store: Arc<Store>,
op_store: Arc<dyn OpStore>,
op_heads_store: Arc<OpHeadsStore>,
op_heads_store: Arc<dyn OpHeadsStore>,
index_store: Arc<IndexStore>,
}
@ -405,7 +406,7 @@ impl RepoLoader {
let store = Store::new(store_factories.load_backend(&repo_path.join("store")));
let repo_settings = user_settings.with_repo(repo_path).unwrap();
let op_store = Arc::from(store_factories.load_op_store(&repo_path.join("op_store")));
let op_heads_store = Arc::new(OpHeadsStore::load(repo_path.join("op_heads")));
let op_heads_store = Arc::new(SimpleOpHeadsStore::load(repo_path.join("op_heads")));
let index_store = Arc::new(IndexStore::load(repo_path.join("index")));
Self {
repo_path: repo_path.to_path_buf(),
@ -433,7 +434,7 @@ impl RepoLoader {
&self.op_store
}
pub fn op_heads_store(&self) -> &Arc<OpHeadsStore> {
pub fn op_heads_store(&self) -> &Arc<dyn OpHeadsStore> {
&self.op_heads_store
}

View file

@ -0,0 +1,234 @@
// Copyright 2021-2022 The Jujutsu Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::fmt::{Debug, Formatter};
use std::path::PathBuf;
use std::sync::Arc;
use itertools::Itertools;
use crate::lock::FileLock;
use crate::op_heads_store::{
LockedOpHeads, LockedOpHeadsResolver, OpHeadResolutionError, OpHeads, OpHeadsStore,
};
use crate::op_store::{OpStore, OperationId, OperationMetadata};
use crate::operation::Operation;
use crate::{dag_walk, op_store};
pub struct SimpleOpHeadsStore {
store: Arc<InnerSimpleOpHeadsStore>,
}
impl Debug for SimpleOpHeadsStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SimpleOpHeadsStore")
.field("dir", &self.store.dir)
.finish()
}
}
/// Manages the very set of current heads of the operation log. This store is
/// simply a directory where each operation id is a file with that name (and no
/// content).
struct InnerSimpleOpHeadsStore {
dir: PathBuf,
}
struct SimpleOpHeadsStoreLockResolver {
store: Arc<InnerSimpleOpHeadsStore>,
_lock: FileLock,
}
impl LockedOpHeadsResolver for SimpleOpHeadsStoreLockResolver {
fn finish(&self, new_op: &Operation) {
self.store.add_op_head(new_op.id());
for old_id in new_op.parent_ids() {
self.store.remove_op_head(old_id);
}
}
}
impl InnerSimpleOpHeadsStore {
pub fn init(
dir: PathBuf,
op_store: &Arc<dyn OpStore>,
root_view: &op_store::View,
operation_metadata: OperationMetadata,
) -> (Self, Operation) {
let root_view_id = op_store.write_view(root_view).unwrap();
let init_operation = op_store::Operation {
view_id: root_view_id,
parents: vec![],
metadata: operation_metadata,
};
let init_operation_id = op_store.write_operation(&init_operation).unwrap();
let init_operation = Operation::new(op_store.clone(), init_operation_id, init_operation);
let op_heads_store = InnerSimpleOpHeadsStore { dir };
op_heads_store.add_op_head(init_operation.id());
(op_heads_store, init_operation)
}
pub fn add_op_head(&self, id: &OperationId) {
std::fs::write(self.dir.join(id.hex()), "").unwrap();
}
pub fn remove_op_head(&self, id: &OperationId) {
// It's fine if the old head was not found. It probably means
// that we're on a distributed file system where the locking
// doesn't work. We'll probably end up with two current
// heads. We'll detect that next time we load the view.
std::fs::remove_file(self.dir.join(id.hex())).ok();
}
pub fn get_op_heads(&self) -> Vec<OperationId> {
let mut op_heads = vec![];
for op_head_entry in std::fs::read_dir(&self.dir).unwrap() {
let op_head_file_name = op_head_entry.unwrap().file_name();
let op_head_file_name = op_head_file_name.to_str().unwrap();
if let Ok(op_head) = hex::decode(op_head_file_name) {
op_heads.push(OperationId::new(op_head));
}
}
op_heads
}
/// Removes operations in the input that are ancestors of other operations
/// in the input. The ancestors are removed both from the list and from
/// disk.
/// TODO: Move this into the OpStore trait for sharing
fn handle_ancestor_ops(&self, op_heads: Vec<Operation>) -> Vec<Operation> {
let op_head_ids_before: HashSet<_> = op_heads.iter().map(|op| op.id().clone()).collect();
let neighbors_fn = |op: &Operation| op.parents();
// Remove ancestors so we don't create merge operation with an operation and its
// ancestor
let op_heads = dag_walk::heads(op_heads, &neighbors_fn, &|op: &Operation| op.id().clone());
let op_head_ids_after: HashSet<_> = op_heads.iter().map(|op| op.id().clone()).collect();
for removed_op_head in op_head_ids_before.difference(&op_head_ids_after) {
self.remove_op_head(removed_op_head);
}
op_heads.into_iter().collect()
}
}
impl SimpleOpHeadsStore {
pub fn init(
dir: PathBuf,
op_store: &Arc<dyn OpStore>,
root_view: &op_store::View,
operation_metadata: OperationMetadata,
) -> (Self, Operation) {
let (inner, init_op) =
InnerSimpleOpHeadsStore::init(dir, op_store, root_view, operation_metadata);
(
SimpleOpHeadsStore {
store: Arc::new(inner),
},
init_op,
)
}
pub fn load(dir: PathBuf) -> Self {
SimpleOpHeadsStore {
store: Arc::new(InnerSimpleOpHeadsStore { dir }),
}
}
}
impl OpHeadsStore for SimpleOpHeadsStore {
fn add_op_head(&self, id: &OperationId) {
self.store.add_op_head(id);
}
fn remove_op_head(&self, id: &OperationId) {
self.store.remove_op_head(id);
}
fn get_op_heads(&self) -> Vec<OperationId> {
self.store.get_op_heads()
}
fn lock(&self) -> LockedOpHeads {
let lock = FileLock::lock(self.store.dir.join("lock"));
LockedOpHeads::new(Box::new(SimpleOpHeadsStoreLockResolver {
store: self.store.clone(),
_lock: lock,
}))
}
fn get_heads(&self, op_store: &Arc<dyn OpStore>) -> Result<OpHeads, OpHeadResolutionError> {
let mut op_heads = self.get_op_heads();
if op_heads.is_empty() {
return Err(OpHeadResolutionError::NoHeads);
}
if op_heads.len() == 1 {
let operation_id = op_heads.pop().unwrap();
let operation = op_store.read_operation(&operation_id).unwrap();
return Ok(OpHeads::Single(Operation::new(
op_store.clone(),
operation_id,
operation,
)));
}
// There are multiple heads. We take a lock, then check if there are still
// multiple heads (it's likely that another process was in the process of
// deleting on of them). If there are still multiple heads, we attempt to
// merge all the views into one. We then write that view and a corresponding
// operation to the op-store.
// Note that the locking isn't necessary for correctness; we take the lock
// only to prevent other concurrent processes from doing the same work (and
// producing another set of divergent heads).
let locked_op_heads = self.lock();
let op_head_ids = self.get_op_heads();
if op_head_ids.is_empty() {
return Err(OpHeadResolutionError::NoHeads);
}
if op_head_ids.len() == 1 {
let op_head_id = op_head_ids[0].clone();
let op_head = op_store.read_operation(&op_head_id).unwrap();
// Return early so we don't write a merge operation with a single parent
return Ok(OpHeads::Single(Operation::new(
op_store.clone(),
op_head_id,
op_head,
)));
}
let op_heads = op_head_ids
.iter()
.map(|op_id: &OperationId| {
let data = op_store.read_operation(op_id).unwrap();
Operation::new(op_store.clone(), op_id.clone(), data)
})
.collect_vec();
let mut op_heads = self.store.handle_ancestor_ops(op_heads);
// Return without creating a merge operation
if op_heads.len() == 1 {
return Ok(OpHeads::Single(op_heads.pop().unwrap()));
}
op_heads.sort_by_key(|op| op.store_operation().metadata.end_time.timestamp.clone());
Ok(OpHeads::Unresolved {
locked_op_heads,
op_heads,
})
}
}

View file

@ -1003,7 +1003,7 @@ fn expand_git_path(path_str: String) -> PathBuf {
fn resolve_op_for_load(
op_store: &Arc<dyn OpStore>,
op_heads_store: &Arc<OpHeadsStore>,
op_heads_store: &Arc<dyn OpHeadsStore>,
op_str: &str,
) -> Result<OpHeads, CommandError> {
if op_str == "@" {
@ -1022,7 +1022,7 @@ fn resolve_op_for_load(
fn resolve_single_op(
op_store: &Arc<dyn OpStore>,
op_heads_store: &Arc<OpHeadsStore>,
op_heads_store: &Arc<dyn OpHeadsStore>,
get_current_op: impl FnOnce() -> Result<Operation, CommandError>,
op_str: &str,
) -> Result<Operation, CommandError> {
@ -1048,7 +1048,7 @@ fn resolve_single_op(
fn find_all_operations(
op_store: &Arc<dyn OpStore>,
op_heads_store: &Arc<OpHeadsStore>,
op_heads_store: &Arc<dyn OpHeadsStore>,
) -> Vec<Operation> {
let mut visited = HashSet::new();
let mut work: VecDeque<_> = op_heads_store.get_op_heads().into_iter().collect();
@ -1066,7 +1066,7 @@ fn find_all_operations(
fn resolve_single_op_from_store(
op_store: &Arc<dyn OpStore>,
op_heads_store: &Arc<OpHeadsStore>,
op_heads_store: &Arc<dyn OpHeadsStore>,
op_str: &str,
) -> Result<Operation, CommandError> {
if op_str.is_empty() || !op_str.as_bytes().iter().all(|b| b.is_ascii_hexdigit()) {