From 309a3f91a14eda2cb63e6ae653d72173b345bed1 Mon Sep 17 00:00:00 2001 From: Daniel Ploch Date: Thu, 15 Dec 2022 17:13:00 -0500 Subject: [PATCH] op_heads_store: refactor into an interface and simple implemenation The implementation has some hoops to jump through because Rust does not allow `self: &Arc` on trait methods, and two of the OpHeadsStore functions need to return cloned selves. This is worked around by making the implementation type itself a wrapper around Arc<>. This is not particularly note worthy for the current implementation type where the only data copied is a PathBuf, but for extensions it is likely to be more critical that the lifetime management of the OpHeadsStore is properly maintained. --- lib/src/lib.rs | 1 + lib/src/op_heads_store.rs | 179 ++++------------------- lib/src/repo.rs | 13 +- lib/src/simple_op_heads_store.rs | 234 +++++++++++++++++++++++++++++++ src/cli_util.rs | 8 +- 5 files changed, 271 insertions(+), 164 deletions(-) create mode 100644 lib/src/simple_op_heads_store.rs diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 79dfe2302..1e33c538f 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -48,6 +48,7 @@ pub mod revset; pub mod revset_graph_iterator; pub mod rewrite; pub mod settings; +pub mod simple_op_heads_store; pub mod simple_op_store; #[cfg(feature = "legacy-thrift")] mod simple_op_store_model; diff --git a/lib/src/op_heads_store.rs b/lib/src/op_heads_store.rs index a4b0923a2..bec0fa5ff 100644 --- a/lib/src/op_heads_store.rs +++ b/lib/src/op_heads_store.rs @@ -12,24 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; -use std::path::PathBuf; +use std::fmt::Debug; use std::sync::Arc; -use itertools::Itertools; use thiserror::Error; -use crate::lock::FileLock; -use crate::op_store::{OpStore, OperationId, OperationMetadata}; +use crate::op_store::{OpStore, OperationId}; use crate::operation::Operation; -use crate::{dag_walk, op_store}; - -/// Manages the very set of current heads of the operation log. The store is -/// simply a directory where each operation id is a file with that name (and no -/// content). -pub struct OpHeadsStore { - dir: PathBuf, -} pub enum OpHeads { /// There's a single latest operation. This is the normal case. @@ -48,157 +37,39 @@ pub enum OpHeadResolutionError { NoHeads, } +pub trait LockedOpHeadsResolver { + fn finish(&self, new_op: &Operation); +} + +// Represents a mutually exclusive lock on the OpHeadsStore in local systems. pub struct LockedOpHeads { - store: Arc, - _lock: FileLock, + resolver: Box, } impl LockedOpHeads { + pub fn new(resolver: Box) -> Self { + LockedOpHeads { resolver } + } + pub fn finish(self, new_op: &Operation) { - self.store.add_op_head(new_op.id()); - for old_id in new_op.parent_ids() { - self.store.remove_op_head(old_id); - } + self.resolver.finish(new_op); } } -impl OpHeadsStore { - pub fn init( - dir: PathBuf, - op_store: &Arc, - root_view: &op_store::View, - operation_metadata: OperationMetadata, - ) -> (Self, Operation) { - let root_view_id = op_store.write_view(root_view).unwrap(); - let init_operation = op_store::Operation { - view_id: root_view_id, - parents: vec![], - metadata: operation_metadata, - }; - let init_operation_id = op_store.write_operation(&init_operation).unwrap(); - let init_operation = Operation::new(op_store.clone(), init_operation_id, init_operation); +/// Manages the very set of current heads of the operation log. +/// +/// Implementations should use Arc<> internally, as the lock() and +/// get_heads() return values which might outlive the original object. When Rust +/// makes it possible for a Trait method to reference &Arc, this can be +/// simplified. +pub trait OpHeadsStore: Send + Sync + Debug { + fn add_op_head(&self, id: &OperationId); - let op_heads_store = OpHeadsStore { dir }; - op_heads_store.add_op_head(init_operation.id()); - (op_heads_store, init_operation) - } + fn remove_op_head(&self, id: &OperationId); - pub fn load(dir: PathBuf) -> OpHeadsStore { - OpHeadsStore { dir } - } + fn get_op_heads(&self) -> Vec; - fn add_op_head(&self, id: &OperationId) { - std::fs::write(self.dir.join(id.hex()), "").unwrap(); - } + fn lock(&self) -> LockedOpHeads; - fn remove_op_head(&self, id: &OperationId) { - // It's fine if the old head was not found. It probably means - // that we're on a distributed file system where the locking - // doesn't work. We'll probably end up with two current - // heads. We'll detect that next time we load the view. - std::fs::remove_file(self.dir.join(id.hex())).ok(); - } - - pub fn get_op_heads(&self) -> Vec { - let mut op_heads = vec![]; - for op_head_entry in std::fs::read_dir(&self.dir).unwrap() { - let op_head_file_name = op_head_entry.unwrap().file_name(); - let op_head_file_name = op_head_file_name.to_str().unwrap(); - if let Ok(op_head) = hex::decode(op_head_file_name) { - op_heads.push(OperationId::new(op_head)); - } - } - op_heads - } - - pub fn lock(self: &Arc) -> LockedOpHeads { - let lock = FileLock::lock(self.dir.join("lock")); - LockedOpHeads { - store: self.clone(), - _lock: lock, - } - } - - pub fn get_heads( - self: &Arc, - op_store: &Arc, - ) -> Result { - let mut op_heads = self.get_op_heads(); - - if op_heads.is_empty() { - return Err(OpHeadResolutionError::NoHeads); - } - - if op_heads.len() == 1 { - let operation_id = op_heads.pop().unwrap(); - let operation = op_store.read_operation(&operation_id).unwrap(); - return Ok(OpHeads::Single(Operation::new( - op_store.clone(), - operation_id, - operation, - ))); - } - - // There are multiple heads. We take a lock, then check if there are still - // multiple heads (it's likely that another process was in the process of - // deleting on of them). If there are still multiple heads, we attempt to - // merge all the views into one. We then write that view and a corresponding - // operation to the op-store. - // Note that the locking isn't necessary for correctness; we take the lock - // only to prevent other concurrent processes from doing the same work (and - // producing another set of divergent heads). - let locked_op_heads = self.lock(); - let op_head_ids = self.get_op_heads(); - - if op_head_ids.is_empty() { - return Err(OpHeadResolutionError::NoHeads); - } - - if op_head_ids.len() == 1 { - let op_head_id = op_head_ids[0].clone(); - let op_head = op_store.read_operation(&op_head_id).unwrap(); - // Return early so we don't write a merge operation with a single parent - return Ok(OpHeads::Single(Operation::new( - op_store.clone(), - op_head_id, - op_head, - ))); - } - - let op_heads = op_head_ids - .iter() - .map(|op_id: &OperationId| { - let data = op_store.read_operation(op_id).unwrap(); - Operation::new(op_store.clone(), op_id.clone(), data) - }) - .collect_vec(); - let mut op_heads = self.handle_ancestor_ops(op_heads); - - // Return without creating a merge operation - if op_heads.len() == 1 { - return Ok(OpHeads::Single(op_heads.pop().unwrap())); - } - - op_heads.sort_by_key(|op| op.store_operation().metadata.end_time.timestamp.clone()); - Ok(OpHeads::Unresolved { - locked_op_heads, - op_heads, - }) - } - - /// Removes operations in the input that are ancestors of other operations - /// in the input. The ancestors are removed both from the list and from - /// disk. - fn handle_ancestor_ops(&self, op_heads: Vec) -> Vec { - let op_head_ids_before: HashSet<_> = op_heads.iter().map(|op| op.id().clone()).collect(); - let neighbors_fn = |op: &Operation| op.parents(); - // Remove ancestors so we don't create merge operation with an operation and its - // ancestor - let op_heads = dag_walk::heads(op_heads, &neighbors_fn, &|op: &Operation| op.id().clone()); - let op_head_ids_after: HashSet<_> = op_heads.iter().map(|op| op.id().clone()).collect(); - for removed_op_head in op_head_ids_before.difference(&op_head_ids_after) { - self.remove_op_head(removed_op_head); - } - op_heads.into_iter().collect() - } + fn get_heads(&self, op_store: &Arc) -> Result; } diff --git a/lib/src/repo.rs b/lib/src/repo.rs index 562fcec08..540a5397b 100644 --- a/lib/src/repo.rs +++ b/lib/src/repo.rs @@ -37,6 +37,7 @@ use crate::op_store::{BranchTarget, OpStore, OperationId, RefTarget, WorkspaceId use crate::operation::Operation; use crate::rewrite::DescendantRebaser; use crate::settings::{RepoSettings, UserSettings}; +use crate::simple_op_heads_store::SimpleOpHeadsStore; use crate::simple_op_store::SimpleOpStore; use crate::store::Store; use crate::transaction::Transaction; @@ -92,7 +93,7 @@ pub struct ReadonlyRepo { repo_path: PathBuf, store: Arc, op_store: Arc, - op_heads_store: Arc, + op_heads_store: Arc, operation: Operation, settings: RepoSettings, index_store: Arc, @@ -148,7 +149,7 @@ impl ReadonlyRepo { let operation_metadata = crate::transaction::create_op_metadata(user_settings, "initialize repo".to_string()); let (op_heads_store, init_op) = - OpHeadsStore::init(op_heads_path, &op_store, &root_view, operation_metadata); + SimpleOpHeadsStore::init(op_heads_path, &op_store, &root_view, operation_metadata); let op_heads_store = Arc::new(op_heads_store); let index_path = repo_path.join("index"); @@ -225,7 +226,7 @@ impl ReadonlyRepo { &self.op_store } - pub fn op_heads_store(&self) -> &Arc { + pub fn op_heads_store(&self) -> &Arc { &self.op_heads_store } @@ -392,7 +393,7 @@ pub struct RepoLoader { repo_settings: RepoSettings, store: Arc, op_store: Arc, - op_heads_store: Arc, + op_heads_store: Arc, index_store: Arc, } @@ -405,7 +406,7 @@ impl RepoLoader { let store = Store::new(store_factories.load_backend(&repo_path.join("store"))); let repo_settings = user_settings.with_repo(repo_path).unwrap(); let op_store = Arc::from(store_factories.load_op_store(&repo_path.join("op_store"))); - let op_heads_store = Arc::new(OpHeadsStore::load(repo_path.join("op_heads"))); + let op_heads_store = Arc::new(SimpleOpHeadsStore::load(repo_path.join("op_heads"))); let index_store = Arc::new(IndexStore::load(repo_path.join("index"))); Self { repo_path: repo_path.to_path_buf(), @@ -433,7 +434,7 @@ impl RepoLoader { &self.op_store } - pub fn op_heads_store(&self) -> &Arc { + pub fn op_heads_store(&self) -> &Arc { &self.op_heads_store } diff --git a/lib/src/simple_op_heads_store.rs b/lib/src/simple_op_heads_store.rs new file mode 100644 index 000000000..3d0b278cf --- /dev/null +++ b/lib/src/simple_op_heads_store.rs @@ -0,0 +1,234 @@ +// Copyright 2021-2022 The Jujutsu Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; +use std::fmt::{Debug, Formatter}; +use std::path::PathBuf; +use std::sync::Arc; + +use itertools::Itertools; + +use crate::lock::FileLock; +use crate::op_heads_store::{ + LockedOpHeads, LockedOpHeadsResolver, OpHeadResolutionError, OpHeads, OpHeadsStore, +}; +use crate::op_store::{OpStore, OperationId, OperationMetadata}; +use crate::operation::Operation; +use crate::{dag_walk, op_store}; + +pub struct SimpleOpHeadsStore { + store: Arc, +} + +impl Debug for SimpleOpHeadsStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SimpleOpHeadsStore") + .field("dir", &self.store.dir) + .finish() + } +} + +/// Manages the very set of current heads of the operation log. This store is +/// simply a directory where each operation id is a file with that name (and no +/// content). +struct InnerSimpleOpHeadsStore { + dir: PathBuf, +} + +struct SimpleOpHeadsStoreLockResolver { + store: Arc, + _lock: FileLock, +} + +impl LockedOpHeadsResolver for SimpleOpHeadsStoreLockResolver { + fn finish(&self, new_op: &Operation) { + self.store.add_op_head(new_op.id()); + for old_id in new_op.parent_ids() { + self.store.remove_op_head(old_id); + } + } +} + +impl InnerSimpleOpHeadsStore { + pub fn init( + dir: PathBuf, + op_store: &Arc, + root_view: &op_store::View, + operation_metadata: OperationMetadata, + ) -> (Self, Operation) { + let root_view_id = op_store.write_view(root_view).unwrap(); + let init_operation = op_store::Operation { + view_id: root_view_id, + parents: vec![], + metadata: operation_metadata, + }; + let init_operation_id = op_store.write_operation(&init_operation).unwrap(); + let init_operation = Operation::new(op_store.clone(), init_operation_id, init_operation); + + let op_heads_store = InnerSimpleOpHeadsStore { dir }; + op_heads_store.add_op_head(init_operation.id()); + (op_heads_store, init_operation) + } + + pub fn add_op_head(&self, id: &OperationId) { + std::fs::write(self.dir.join(id.hex()), "").unwrap(); + } + + pub fn remove_op_head(&self, id: &OperationId) { + // It's fine if the old head was not found. It probably means + // that we're on a distributed file system where the locking + // doesn't work. We'll probably end up with two current + // heads. We'll detect that next time we load the view. + std::fs::remove_file(self.dir.join(id.hex())).ok(); + } + + pub fn get_op_heads(&self) -> Vec { + let mut op_heads = vec![]; + for op_head_entry in std::fs::read_dir(&self.dir).unwrap() { + let op_head_file_name = op_head_entry.unwrap().file_name(); + let op_head_file_name = op_head_file_name.to_str().unwrap(); + if let Ok(op_head) = hex::decode(op_head_file_name) { + op_heads.push(OperationId::new(op_head)); + } + } + op_heads + } + + /// Removes operations in the input that are ancestors of other operations + /// in the input. The ancestors are removed both from the list and from + /// disk. + /// TODO: Move this into the OpStore trait for sharing + fn handle_ancestor_ops(&self, op_heads: Vec) -> Vec { + let op_head_ids_before: HashSet<_> = op_heads.iter().map(|op| op.id().clone()).collect(); + let neighbors_fn = |op: &Operation| op.parents(); + // Remove ancestors so we don't create merge operation with an operation and its + // ancestor + let op_heads = dag_walk::heads(op_heads, &neighbors_fn, &|op: &Operation| op.id().clone()); + let op_head_ids_after: HashSet<_> = op_heads.iter().map(|op| op.id().clone()).collect(); + for removed_op_head in op_head_ids_before.difference(&op_head_ids_after) { + self.remove_op_head(removed_op_head); + } + op_heads.into_iter().collect() + } +} + +impl SimpleOpHeadsStore { + pub fn init( + dir: PathBuf, + op_store: &Arc, + root_view: &op_store::View, + operation_metadata: OperationMetadata, + ) -> (Self, Operation) { + let (inner, init_op) = + InnerSimpleOpHeadsStore::init(dir, op_store, root_view, operation_metadata); + ( + SimpleOpHeadsStore { + store: Arc::new(inner), + }, + init_op, + ) + } + + pub fn load(dir: PathBuf) -> Self { + SimpleOpHeadsStore { + store: Arc::new(InnerSimpleOpHeadsStore { dir }), + } + } +} + +impl OpHeadsStore for SimpleOpHeadsStore { + fn add_op_head(&self, id: &OperationId) { + self.store.add_op_head(id); + } + + fn remove_op_head(&self, id: &OperationId) { + self.store.remove_op_head(id); + } + + fn get_op_heads(&self) -> Vec { + self.store.get_op_heads() + } + + fn lock(&self) -> LockedOpHeads { + let lock = FileLock::lock(self.store.dir.join("lock")); + LockedOpHeads::new(Box::new(SimpleOpHeadsStoreLockResolver { + store: self.store.clone(), + _lock: lock, + })) + } + + fn get_heads(&self, op_store: &Arc) -> Result { + let mut op_heads = self.get_op_heads(); + + if op_heads.is_empty() { + return Err(OpHeadResolutionError::NoHeads); + } + + if op_heads.len() == 1 { + let operation_id = op_heads.pop().unwrap(); + let operation = op_store.read_operation(&operation_id).unwrap(); + return Ok(OpHeads::Single(Operation::new( + op_store.clone(), + operation_id, + operation, + ))); + } + + // There are multiple heads. We take a lock, then check if there are still + // multiple heads (it's likely that another process was in the process of + // deleting on of them). If there are still multiple heads, we attempt to + // merge all the views into one. We then write that view and a corresponding + // operation to the op-store. + // Note that the locking isn't necessary for correctness; we take the lock + // only to prevent other concurrent processes from doing the same work (and + // producing another set of divergent heads). + let locked_op_heads = self.lock(); + let op_head_ids = self.get_op_heads(); + + if op_head_ids.is_empty() { + return Err(OpHeadResolutionError::NoHeads); + } + + if op_head_ids.len() == 1 { + let op_head_id = op_head_ids[0].clone(); + let op_head = op_store.read_operation(&op_head_id).unwrap(); + // Return early so we don't write a merge operation with a single parent + return Ok(OpHeads::Single(Operation::new( + op_store.clone(), + op_head_id, + op_head, + ))); + } + + let op_heads = op_head_ids + .iter() + .map(|op_id: &OperationId| { + let data = op_store.read_operation(op_id).unwrap(); + Operation::new(op_store.clone(), op_id.clone(), data) + }) + .collect_vec(); + let mut op_heads = self.store.handle_ancestor_ops(op_heads); + + // Return without creating a merge operation + if op_heads.len() == 1 { + return Ok(OpHeads::Single(op_heads.pop().unwrap())); + } + + op_heads.sort_by_key(|op| op.store_operation().metadata.end_time.timestamp.clone()); + Ok(OpHeads::Unresolved { + locked_op_heads, + op_heads, + }) + } +} diff --git a/src/cli_util.rs b/src/cli_util.rs index 29c4cd70e..64ebe4d8b 100644 --- a/src/cli_util.rs +++ b/src/cli_util.rs @@ -1003,7 +1003,7 @@ fn expand_git_path(path_str: String) -> PathBuf { fn resolve_op_for_load( op_store: &Arc, - op_heads_store: &Arc, + op_heads_store: &Arc, op_str: &str, ) -> Result { if op_str == "@" { @@ -1022,7 +1022,7 @@ fn resolve_op_for_load( fn resolve_single_op( op_store: &Arc, - op_heads_store: &Arc, + op_heads_store: &Arc, get_current_op: impl FnOnce() -> Result, op_str: &str, ) -> Result { @@ -1048,7 +1048,7 @@ fn resolve_single_op( fn find_all_operations( op_store: &Arc, - op_heads_store: &Arc, + op_heads_store: &Arc, ) -> Vec { let mut visited = HashSet::new(); let mut work: VecDeque<_> = op_heads_store.get_op_heads().into_iter().collect(); @@ -1066,7 +1066,7 @@ fn find_all_operations( fn resolve_single_op_from_store( op_store: &Arc, - op_heads_store: &Arc, + op_heads_store: &Arc, op_str: &str, ) -> Result { if op_str.is_empty() || !op_str.as_bytes().iter().all(|b| b.is_ascii_hexdigit()) {