repo: move resolving of concurrent operations to repo level (#111)

It's useful for the UI layer to know that there's been concurrent
operations, so it can inform the user that that happened. It'll be
even more useful when we soon start making the resolution involve
rebasing commits, since that's even more important for the UI layer to
present to the user. This patch gets us a bit closer to that by moving
the resolution to the repo level.
This commit is contained in:
Martin von Zweigbergk 2022-03-18 23:11:10 -07:00 committed by Martin von Zweigbergk
parent 364f8010e7
commit 11525bfba0
2 changed files with 70 additions and 44 deletions

View file

@ -24,7 +24,6 @@ use crate::lock::FileLock;
use crate::op_store::{OpStore, OperationId, OperationMetadata};
use crate::operation::Operation;
use crate::repo::RepoLoader;
use crate::transaction::UnpublishedOperation;
use crate::{dag_walk, op_store};
/// Manages the very set of current heads of the operation log. The store is
@ -34,6 +33,17 @@ pub struct OpHeadsStore {
dir: PathBuf,
}
pub enum OpHeads {
/// There's a single latest operation. This is the normal case.
Single(Operation),
/// There are multiple latest operations, which means there has been
/// concurrent operations. These need to be resolved.
Unresolved {
locked_op_heads: LockedOpHeads,
op_heads: Vec<Operation>,
},
}
#[derive(Debug, Error, PartialEq, Eq)]
pub enum OpHeadResolutionError {
#[error("Operation log has no heads")]
@ -112,10 +122,10 @@ impl OpHeadsStore {
}
}
pub fn get_single_op_head(
pub fn get_heads(
self: &Arc<Self>,
repo_loader: &RepoLoader,
) -> Result<Operation, OpHeadResolutionError> {
) -> Result<OpHeads, OpHeadResolutionError> {
let mut op_heads = self.get_op_heads();
if op_heads.is_empty() {
@ -127,7 +137,11 @@ impl OpHeadsStore {
if op_heads.len() == 1 {
let operation_id = op_heads.pop().unwrap();
let operation = op_store.read_operation(&operation_id).unwrap();
return Ok(Operation::new(op_store.clone(), operation_id, operation));
return Ok(OpHeads::Single(Operation::new(
op_store.clone(),
operation_id,
operation,
)));
}
// There are multiple heads. We take a lock, then check if there are still
@ -136,7 +150,7 @@ impl OpHeadsStore {
// merge all the views into one. We then write that view and a corresponding
// operation to the op-store.
// Note that the locking isn't necessary for correctness; we take the lock
// only to avoid other concurrent processes from doing the same work (and
// only to prevent other concurrent processes from doing the same work (and
// producing another set of divergent heads).
let locked_op_heads = self.lock();
let op_head_ids = self.get_op_heads();
@ -149,7 +163,11 @@ impl OpHeadsStore {
let op_head_id = op_head_ids[0].clone();
let op_head = op_store.read_operation(&op_head_id).unwrap();
// Return early so we don't write a merge operation with a single parent
return Ok(Operation::new(op_store.clone(), op_head_id, op_head));
return Ok(OpHeads::Single(Operation::new(
op_store.clone(),
op_head_id,
op_head,
)));
}
let op_heads = op_head_ids
@ -163,14 +181,13 @@ impl OpHeadsStore {
// Return without creating a merge operation
if op_heads.len() == 1 {
return Ok(op_heads.pop().unwrap());
return Ok(OpHeads::Single(op_heads.pop().unwrap()));
}
let merged_repo = merge_op_heads(repo_loader, op_heads).leave_unpublished();
let merge_operation = merged_repo.operation().clone();
locked_op_heads.finish(&merge_operation);
// TODO: Change the return type include the repo if we have it (as we do here)
Ok(merge_operation)
Ok(OpHeads::Unresolved {
locked_op_heads,
op_heads,
})
}
/// Removes operations in the input that are ancestors of other operations
@ -189,29 +206,3 @@ impl OpHeadsStore {
op_heads.into_iter().collect()
}
}
fn merge_op_heads(repo_loader: &RepoLoader, mut op_heads: Vec<Operation>) -> UnpublishedOperation {
op_heads.sort_by_key(|op| op.store_operation().metadata.end_time.timestamp.clone());
let base_repo = repo_loader.load_at(&op_heads[0]);
let mut tx = base_repo.start_transaction("resolve concurrent operations");
let merged_repo = tx.mut_repo();
let neighbors_fn = |op: &Operation| op.parents();
for (i, other_op_head) in op_heads.iter().enumerate().skip(1) {
let ancestor_op = dag_walk::closest_common_node(
op_heads[0..i].to_vec(),
vec![other_op_head.clone()],
&neighbors_fn,
&|op: &Operation| op.id().clone(),
)
.unwrap();
let base_repo = repo_loader.load_at(&ancestor_op);
let other_repo = repo_loader.load_at(other_op_head);
merged_repo.merge(&base_repo, &other_repo);
}
let op_parent_ids = op_heads.iter().map(|op| op.id().clone()).collect();
tx.set_parents(op_parent_ids);
// TODO: We already have the resulting View in this case but Operation cannot
// keep it. Teach Operation to have a cached View so the caller won't have
// to re-read it from the store (by calling Operation::view())?
tx.write()
}

View file

@ -25,17 +25,17 @@ use thiserror::Error;
use crate::backend::{BackendError, CommitId};
use crate::commit::Commit;
use crate::commit_builder::CommitBuilder;
use crate::dag_walk::topo_order_reverse;
use crate::dag_walk::{closest_common_node, topo_order_reverse};
use crate::index::{IndexRef, MutableIndex, ReadonlyIndex};
use crate::index_store::IndexStore;
use crate::op_heads_store::OpHeadsStore;
use crate::op_heads_store::{OpHeads, OpHeadsStore};
use crate::op_store::{BranchTarget, OpStore, OperationId, RefTarget, WorkspaceId};
use crate::operation::Operation;
use crate::rewrite::DescendantRebaser;
use crate::settings::{RepoSettings, UserSettings};
use crate::simple_op_store::SimpleOpStore;
use crate::store::Store;
use crate::transaction::Transaction;
use crate::transaction::{Transaction, UnpublishedOperation};
use crate::view::{RefName, View};
use crate::{backend, op_store};
@ -332,9 +332,44 @@ impl RepoLoader {
}
pub fn load_at_head(&self) -> Arc<ReadonlyRepo> {
let op = self.op_heads_store.get_single_op_head(self).unwrap();
let view = View::new(op.view().take_store_view());
self._finish_load(op, view)
let op_heads = self.op_heads_store.get_heads(self).unwrap();
match op_heads {
OpHeads::Single(op) => {
let view = View::new(op.view().take_store_view());
self._finish_load(op, view)
}
OpHeads::Unresolved {
locked_op_heads,
op_heads,
} => {
let merged_repo = self.merge_op_heads(op_heads).leave_unpublished();
locked_op_heads.finish(merged_repo.operation());
merged_repo
}
}
}
fn merge_op_heads(&self, mut op_heads: Vec<Operation>) -> UnpublishedOperation {
op_heads.sort_by_key(|op| op.store_operation().metadata.end_time.timestamp.clone());
let base_repo = self.load_at(&op_heads[0]);
let mut tx = base_repo.start_transaction("resolve concurrent operations");
let merged_repo = tx.mut_repo();
let neighbors_fn = |op: &Operation| op.parents();
for (i, other_op_head) in op_heads.iter().enumerate().skip(1) {
let ancestor_op = closest_common_node(
op_heads[0..i].to_vec(),
vec![other_op_head.clone()],
&neighbors_fn,
&|op: &Operation| op.id().clone(),
)
.unwrap();
let base_repo = self.load_at(&ancestor_op);
let other_repo = self.load_at(other_op_head);
merged_repo.merge(&base_repo, &other_repo);
}
let op_parent_ids = op_heads.iter().map(|op| op.id().clone()).collect();
tx.set_parents(op_parent_ids);
tx.write()
}
pub fn load_at(&self, op: &Operation) -> Arc<ReadonlyRepo> {