op_heads_store: move lock and updating of heads on disk to new type (#111)

We had a few lines of similar code where we added a new of the
operation log and then removed the old heads. By moving that code into
a new type, we prepare for further refactorings.
This commit is contained in:
Martin von Zweigbergk 2022-03-19 22:55:32 -07:00 committed by Martin von Zweigbergk
parent 39838a76bf
commit 40aa9741ec
2 changed files with 25 additions and 17 deletions

View file

@ -40,6 +40,20 @@ pub enum OpHeadResolutionError {
NoHeads, NoHeads,
} }
pub struct LockedOpHeads {
store: Arc<OpHeadsStore>,
_lock: FileLock,
}
impl LockedOpHeads {
pub fn finish(self, new_op: &Operation) {
self.store.add_op_head(new_op.id());
for old_id in new_op.parent_ids() {
self.store.remove_op_head(old_id);
}
}
}
impl OpHeadsStore { impl OpHeadsStore {
pub fn init( pub fn init(
dir: PathBuf, dir: PathBuf,
@ -66,7 +80,7 @@ impl OpHeadsStore {
OpHeadsStore { dir } OpHeadsStore { dir }
} }
pub fn add_op_head(&self, id: &OperationId) { fn add_op_head(&self, id: &OperationId) {
std::fs::write(self.dir.join(id.hex()), "").unwrap(); std::fs::write(self.dir.join(id.hex()), "").unwrap();
} }
@ -90,20 +104,16 @@ impl OpHeadsStore {
op_heads op_heads
} }
fn lock(&self) -> FileLock { pub fn lock(self: &Arc<Self>) -> LockedOpHeads {
FileLock::lock(self.dir.join("lock")) let lock = FileLock::lock(self.dir.join("lock"));
} LockedOpHeads {
store: self.clone(),
pub fn update_op_heads(&self, op: &Operation) { _lock: lock,
let _op_heads_lock = self.lock();
self.add_op_head(op.id());
for old_parent_id in op.parent_ids() {
self.remove_op_head(old_parent_id);
} }
} }
pub fn get_single_op_head( pub fn get_single_op_head(
&self, self: &Arc<Self>,
repo_loader: &RepoLoader, repo_loader: &RepoLoader,
) -> Result<Operation, OpHeadResolutionError> { ) -> Result<Operation, OpHeadResolutionError> {
let mut op_heads = self.get_op_heads(); let mut op_heads = self.get_op_heads();
@ -128,7 +138,7 @@ impl OpHeadsStore {
// Note that the locking isn't necessary for correctness; we take the lock // Note that the locking isn't necessary for correctness; we take the lock
// only to avoid other concurrent processes from doing the same work (and // only to avoid other concurrent processes from doing the same work (and
// producing another set of divergent heads). // producing another set of divergent heads).
let _lock = self.lock(); let locked_op_heads = self.lock();
let op_head_ids = self.get_op_heads(); let op_head_ids = self.get_op_heads();
if op_head_ids.is_empty() { if op_head_ids.is_empty() {
@ -158,10 +168,7 @@ impl OpHeadsStore {
let merged_repo = merge_op_heads(repo_loader, op_heads)?.leave_unpublished(); let merged_repo = merge_op_heads(repo_loader, op_heads)?.leave_unpublished();
let merge_operation = merged_repo.operation().clone(); let merge_operation = merged_repo.operation().clone();
self.add_op_head(merge_operation.id()); locked_op_heads.finish(&merge_operation);
for old_op_head_id in merge_operation.parent_ids() {
self.remove_op_head(old_op_head_id);
}
// TODO: Change the return type include the repo if we have it (as we do here) // TODO: Change the return type include the repo if we have it (as we do here)
Ok(merge_operation) Ok(merge_operation)
} }

View file

@ -135,7 +135,8 @@ impl UnpublishedOperation {
let data = self.data.take().unwrap(); let data = self.data.take().unwrap();
self.repo_loader self.repo_loader
.op_heads_store() .op_heads_store()
.update_op_heads(&data.operation); .lock()
.finish(&data.operation);
let repo = self let repo = self
.repo_loader .repo_loader
.create_from(data.operation, data.view, data.index); .create_from(data.operation, data.view, data.index);