From 11525bfba0624fb1b6093eff8ee9127569257d15 Mon Sep 17 00:00:00 2001 From: Martin von Zweigbergk Date: Fri, 18 Mar 2022 23:11:10 -0700 Subject: [PATCH] repo: move resolving of concurrent operations to repo level (#111) It's useful for the UI layer to know that there's been concurrent operations, so it can inform the user that that happened. It'll be even more useful when we soon start making the resolution involve rebasing commits, since that's even more important for the UI layer to present to the user. This patch gets us a bit closer to that by moving the resolution to the repo level. --- lib/src/op_heads_store.rs | 67 +++++++++++++++++---------------------- lib/src/repo.rs | 47 +++++++++++++++++++++++---- 2 files changed, 70 insertions(+), 44 deletions(-) diff --git a/lib/src/op_heads_store.rs b/lib/src/op_heads_store.rs index 568496cd6..0eddc71fb 100644 --- a/lib/src/op_heads_store.rs +++ b/lib/src/op_heads_store.rs @@ -24,7 +24,6 @@ use crate::lock::FileLock; use crate::op_store::{OpStore, OperationId, OperationMetadata}; use crate::operation::Operation; use crate::repo::RepoLoader; -use crate::transaction::UnpublishedOperation; use crate::{dag_walk, op_store}; /// Manages the very set of current heads of the operation log. The store is @@ -34,6 +33,17 @@ pub struct OpHeadsStore { dir: PathBuf, } +pub enum OpHeads { + /// There's a single latest operation. This is the normal case. + Single(Operation), + /// There are multiple latest operations, which means there has been + /// concurrent operations. These need to be resolved. + Unresolved { + locked_op_heads: LockedOpHeads, + op_heads: Vec, + }, +} + #[derive(Debug, Error, PartialEq, Eq)] pub enum OpHeadResolutionError { #[error("Operation log has no heads")] @@ -112,10 +122,10 @@ impl OpHeadsStore { } } - pub fn get_single_op_head( + pub fn get_heads( self: &Arc, repo_loader: &RepoLoader, - ) -> Result { + ) -> Result { let mut op_heads = self.get_op_heads(); if op_heads.is_empty() { @@ -127,7 +137,11 @@ impl OpHeadsStore { if op_heads.len() == 1 { let operation_id = op_heads.pop().unwrap(); let operation = op_store.read_operation(&operation_id).unwrap(); - return Ok(Operation::new(op_store.clone(), operation_id, operation)); + return Ok(OpHeads::Single(Operation::new( + op_store.clone(), + operation_id, + operation, + ))); } // There are multiple heads. We take a lock, then check if there are still @@ -136,7 +150,7 @@ impl OpHeadsStore { // merge all the views into one. We then write that view and a corresponding // operation to the op-store. // Note that the locking isn't necessary for correctness; we take the lock - // only to avoid other concurrent processes from doing the same work (and + // only to prevent other concurrent processes from doing the same work (and // producing another set of divergent heads). let locked_op_heads = self.lock(); let op_head_ids = self.get_op_heads(); @@ -149,7 +163,11 @@ impl OpHeadsStore { let op_head_id = op_head_ids[0].clone(); let op_head = op_store.read_operation(&op_head_id).unwrap(); // Return early so we don't write a merge operation with a single parent - return Ok(Operation::new(op_store.clone(), op_head_id, op_head)); + return Ok(OpHeads::Single(Operation::new( + op_store.clone(), + op_head_id, + op_head, + ))); } let op_heads = op_head_ids @@ -163,14 +181,13 @@ impl OpHeadsStore { // Return without creating a merge operation if op_heads.len() == 1 { - return Ok(op_heads.pop().unwrap()); + return Ok(OpHeads::Single(op_heads.pop().unwrap())); } - let merged_repo = merge_op_heads(repo_loader, op_heads).leave_unpublished(); - let merge_operation = merged_repo.operation().clone(); - locked_op_heads.finish(&merge_operation); - // TODO: Change the return type include the repo if we have it (as we do here) - Ok(merge_operation) + Ok(OpHeads::Unresolved { + locked_op_heads, + op_heads, + }) } /// Removes operations in the input that are ancestors of other operations @@ -189,29 +206,3 @@ impl OpHeadsStore { op_heads.into_iter().collect() } } - -fn merge_op_heads(repo_loader: &RepoLoader, mut op_heads: Vec) -> UnpublishedOperation { - op_heads.sort_by_key(|op| op.store_operation().metadata.end_time.timestamp.clone()); - let base_repo = repo_loader.load_at(&op_heads[0]); - let mut tx = base_repo.start_transaction("resolve concurrent operations"); - let merged_repo = tx.mut_repo(); - let neighbors_fn = |op: &Operation| op.parents(); - for (i, other_op_head) in op_heads.iter().enumerate().skip(1) { - let ancestor_op = dag_walk::closest_common_node( - op_heads[0..i].to_vec(), - vec![other_op_head.clone()], - &neighbors_fn, - &|op: &Operation| op.id().clone(), - ) - .unwrap(); - let base_repo = repo_loader.load_at(&ancestor_op); - let other_repo = repo_loader.load_at(other_op_head); - merged_repo.merge(&base_repo, &other_repo); - } - let op_parent_ids = op_heads.iter().map(|op| op.id().clone()).collect(); - tx.set_parents(op_parent_ids); - // TODO: We already have the resulting View in this case but Operation cannot - // keep it. Teach Operation to have a cached View so the caller won't have - // to re-read it from the store (by calling Operation::view())? - tx.write() -} diff --git a/lib/src/repo.rs b/lib/src/repo.rs index f76703fd1..1053497d7 100644 --- a/lib/src/repo.rs +++ b/lib/src/repo.rs @@ -25,17 +25,17 @@ use thiserror::Error; use crate::backend::{BackendError, CommitId}; use crate::commit::Commit; use crate::commit_builder::CommitBuilder; -use crate::dag_walk::topo_order_reverse; +use crate::dag_walk::{closest_common_node, topo_order_reverse}; use crate::index::{IndexRef, MutableIndex, ReadonlyIndex}; use crate::index_store::IndexStore; -use crate::op_heads_store::OpHeadsStore; +use crate::op_heads_store::{OpHeads, OpHeadsStore}; use crate::op_store::{BranchTarget, OpStore, OperationId, RefTarget, WorkspaceId}; use crate::operation::Operation; use crate::rewrite::DescendantRebaser; use crate::settings::{RepoSettings, UserSettings}; use crate::simple_op_store::SimpleOpStore; use crate::store::Store; -use crate::transaction::Transaction; +use crate::transaction::{Transaction, UnpublishedOperation}; use crate::view::{RefName, View}; use crate::{backend, op_store}; @@ -332,9 +332,44 @@ impl RepoLoader { } pub fn load_at_head(&self) -> Arc { - let op = self.op_heads_store.get_single_op_head(self).unwrap(); - let view = View::new(op.view().take_store_view()); - self._finish_load(op, view) + let op_heads = self.op_heads_store.get_heads(self).unwrap(); + match op_heads { + OpHeads::Single(op) => { + let view = View::new(op.view().take_store_view()); + self._finish_load(op, view) + } + OpHeads::Unresolved { + locked_op_heads, + op_heads, + } => { + let merged_repo = self.merge_op_heads(op_heads).leave_unpublished(); + locked_op_heads.finish(merged_repo.operation()); + merged_repo + } + } + } + + fn merge_op_heads(&self, mut op_heads: Vec) -> UnpublishedOperation { + op_heads.sort_by_key(|op| op.store_operation().metadata.end_time.timestamp.clone()); + let base_repo = self.load_at(&op_heads[0]); + let mut tx = base_repo.start_transaction("resolve concurrent operations"); + let merged_repo = tx.mut_repo(); + let neighbors_fn = |op: &Operation| op.parents(); + for (i, other_op_head) in op_heads.iter().enumerate().skip(1) { + let ancestor_op = closest_common_node( + op_heads[0..i].to_vec(), + vec![other_op_head.clone()], + &neighbors_fn, + &|op: &Operation| op.id().clone(), + ) + .unwrap(); + let base_repo = self.load_at(&ancestor_op); + let other_repo = self.load_at(other_op_head); + merged_repo.merge(&base_repo, &other_repo); + } + let op_parent_ids = op_heads.iter().map(|op| op.id().clone()).collect(); + tx.set_parents(op_parent_ids); + tx.write() } pub fn load_at(&self, op: &Operation) -> Arc {