OpHeadsStore: read operation objects before calling merge_op_heads()

This is just a little refactoring to prepare for filtering out
ancestors earlier.
This commit is contained in:
Martin von Zweigbergk 2021-03-11 23:07:47 -08:00
parent 27293829d6
commit d4c39d399f

View file

@ -132,24 +132,38 @@ impl OpHeadsStore {
// only to avoid other concurrent processes from doing the same work (and // only to avoid other concurrent processes from doing the same work (and
// producing another set of divergent heads). // producing another set of divergent heads).
let _lock = self.lock(); let _lock = self.lock();
let op_heads = self.get_op_heads(); let op_head_ids = self.get_op_heads();
if op_heads.is_empty() { if op_head_ids.is_empty() {
return Err(OpHeadResolutionError::NoHeads); return Err(OpHeadResolutionError::NoHeads);
} }
if op_heads.len() == 1 { if op_head_ids.len() == 1 {
let op_head_id = op_heads[0].clone(); let op_head_id = op_head_ids[0].clone();
let op_head = op_store.read_operation(&op_head_id).unwrap(); let op_head = op_store.read_operation(&op_head_id).unwrap();
// Return early so we don't write a merge operation with a single parent // Return early so we don't write a merge operation with a single parent
let view = op_store.read_view(&op_head.view_id).unwrap(); let view = op_store.read_view(&op_head.view_id).unwrap();
return Ok((op_head_id, op_head, view)); return Ok((op_head_id, op_head, view));
} }
let op_heads: Vec<_> = op_head_ids
.iter()
.map(|op_id: &OperationId| {
let data = op_store.read_operation(op_id).unwrap();
Operation::new(op_store.clone(), op_id.clone(), data)
})
.collect();
let neighbors_fn = |op: &Operation| op.parents();
// Remove ancestors so we don't create merge operation with an operation and its
// ancestor
let op_heads =
dag_walk::unreachable(op_heads, &neighbors_fn, &|op: &Operation| op.id().clone());
let op_heads: Vec<_> = op_heads.into_iter().collect();
let (merge_operation_id, merge_operation, merged_view) = let (merge_operation_id, merge_operation, merged_view) =
merge_op_heads(store, op_store, index_store, &op_heads)?; merge_op_heads(store, op_store, index_store, op_heads)?;
self.add_op_head(&merge_operation_id); self.add_op_head(&merge_operation_id);
for old_op_head_id in op_heads { for old_op_head_id in op_head_ids {
// The merged one will be in the input to the merge if it's a "fast-forward" // The merged one will be in the input to the merge if it's a "fast-forward"
// merge. // merge.
if old_op_head_id != merge_operation_id { if old_op_head_id != merge_operation_id {
@ -164,21 +178,8 @@ fn merge_op_heads(
store: &StoreWrapper, store: &StoreWrapper,
op_store: &Arc<dyn OpStore>, op_store: &Arc<dyn OpStore>,
index_store: &Arc<IndexStore>, index_store: &Arc<IndexStore>,
op_head_ids: &[OperationId], mut op_heads: Vec<Operation>,
) -> Result<(OperationId, op_store::Operation, op_store::View), OpHeadResolutionError> { ) -> Result<(OperationId, op_store::Operation, op_store::View), OpHeadResolutionError> {
let op_heads: Vec<_> = op_head_ids
.iter()
.map(|op_id: &OperationId| {
let data = op_store.read_operation(op_id).unwrap();
Operation::new(op_store.clone(), op_id.clone(), data)
})
.collect();
let neighbors_fn = |op: &Operation| op.parents();
// Remove ancestors so we don't create merge operation with an operation and its
// ancestor
let op_heads =
dag_walk::unreachable(op_heads, &neighbors_fn, &|op: &Operation| op.id().clone());
let mut op_heads: Vec<_> = op_heads.into_iter().collect();
op_heads.sort_by_key(|op| op.store_operation().metadata.end_time.timestamp.clone()); op_heads.sort_by_key(|op| op.store_operation().metadata.end_time.timestamp.clone());
let first_op_head = op_heads[0].clone(); let first_op_head = op_heads[0].clone();
let mut merged_view = op_store.read_view(first_op_head.view().id()).unwrap(); let mut merged_view = op_store.read_view(first_op_head.view().id()).unwrap();
@ -192,6 +193,7 @@ fn merge_op_heads(
)); ));
} }
let neighbors_fn = |op: &Operation| op.parents();
let base_index = index_store.get_index_at_op(&first_op_head, store); let base_index = index_store.get_index_at_op(&first_op_head, store);
let mut index = MutableIndex::incremental(base_index); let mut index = MutableIndex::incremental(base_index);
for (i, other_op_head) in op_heads.iter().enumerate().skip(1) { for (i, other_op_head) in op_heads.iter().enumerate().skip(1) {