jj/lib/src/index_store.rs
Martin von Zweigbergk 779db67f8f index_store: avoid passing whole repo into get_index_at_op()
I want to be able to load the index at an operation before the repo
has been loaded.
2021-03-06 23:06:35 -08:00

223 lines
7.4 KiB
Rust

// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::commit::Commit;
use crate::dag_walk;
use crate::index::{MutableIndex, ReadonlyIndex};
use crate::op_store::OperationId;
use crate::operation::Operation;
use crate::store::CommitId;
use crate::store_wrapper::StoreWrapper;
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::io;
use std::io::{Read, Write};
use std::path::PathBuf;
use std::sync::Arc;
use tempfile::NamedTempFile;
pub struct IndexStore {
dir: PathBuf,
}
impl IndexStore {
pub fn init(dir: PathBuf) -> Self {
std::fs::create_dir(dir.join("operations")).unwrap();
IndexStore { dir }
}
pub fn reinit(&self) {
std::fs::remove_dir_all(self.dir.join("operations")).unwrap();
IndexStore::init(self.dir.clone());
}
pub fn load(dir: PathBuf) -> IndexStore {
IndexStore { dir }
}
pub fn get_index_at_op(&self, op: &Operation, store: &StoreWrapper) -> Arc<ReadonlyIndex> {
let op_id_hex = op.id().hex();
let op_id_file = self.dir.join("operations").join(&op_id_hex);
if op_id_file.exists() {
self.load_index_at_operation(store.hash_length(), op.id())
.unwrap()
} else {
self.index_at_operation(store, op).unwrap()
}
}
pub fn write_index(&self, index: MutableIndex) -> io::Result<Arc<ReadonlyIndex>> {
index.save_in(self.dir.clone())
}
fn load_index_at_operation(
&self,
hash_length: usize,
op_id: &OperationId,
) -> io::Result<Arc<ReadonlyIndex>> {
let op_id_file = self.dir.join("operations").join(op_id.hex());
let mut buf = vec![];
File::open(op_id_file)
.unwrap()
.read_to_end(&mut buf)
.unwrap();
let index_file_id_hex = String::from_utf8(buf).unwrap();
let index_file_path = self.dir.join(&index_file_id_hex);
let mut index_file = File::open(&index_file_path).unwrap();
ReadonlyIndex::load_from(
&mut index_file,
self.dir.clone(),
index_file_id_hex,
hash_length,
)
}
fn index_at_operation(
&self,
store: &StoreWrapper,
operation: &Operation,
) -> io::Result<Arc<ReadonlyIndex>> {
let view = operation.view();
let operations_dir = self.dir.join("operations");
let hash_length = store.hash_length();
let mut new_heads = view.heads().clone();
let mut parent_op_id: Option<OperationId> = None;
for op in dag_walk::bfs(
vec![operation.clone()],
Box::new(|op: &Operation| op.id().clone()),
Box::new(|op: &Operation| op.parents()),
) {
if operations_dir.join(op.id().hex()).is_file() {
if parent_op_id.is_none() {
parent_op_id = Some(op.id().clone())
}
} else {
for head in op.view().heads() {
new_heads.insert(head.clone());
}
}
}
let mut data;
let maybe_parent_file;
match parent_op_id {
None => {
maybe_parent_file = None;
data = MutableIndex::full(hash_length);
}
Some(parent_op_id) => {
let parent_file = self
.load_index_at_operation(hash_length, &parent_op_id)
.unwrap();
maybe_parent_file = Some(parent_file.clone());
data = MutableIndex::incremental(parent_file)
}
}
let mut heads: Vec<CommitId> = new_heads.into_iter().collect();
heads.sort();
let commits = topo_order_earlier_first(store, heads, maybe_parent_file);
for commit in &commits {
data.add_commit(&commit);
}
let index_file = data.save_in(self.dir.clone())?;
self.associate_file_with_operation(&index_file, operation.id())?;
Ok(index_file)
}
/// Records a link from the given operation to the this index version.
pub fn associate_file_with_operation(
&self,
index: &ReadonlyIndex,
op_id: &OperationId,
) -> io::Result<()> {
let mut temp_file = NamedTempFile::new_in(&self.dir)?;
let file = temp_file.as_file_mut();
file.write_all(&index.name().as_bytes()).unwrap();
temp_file.persist(&self.dir.join("operations").join(op_id.hex()))?;
Ok(())
}
}
// Returns the ancestors of heads with parents and predecessors come before the
// commit itself
fn topo_order_earlier_first(
store: &StoreWrapper,
heads: Vec<CommitId>,
parent_file: Option<Arc<ReadonlyIndex>>,
) -> Vec<Commit> {
// First create a list of all commits in topological order with
// children/successors first (reverse of what we want)
let mut work = vec![];
for head in &heads {
work.push(store.get_commit(head).unwrap());
}
let mut commits = vec![];
let mut visited = HashSet::new();
let mut in_parent_file = HashSet::new();
let parent_file_source = parent_file.as_ref().map(|file| file.as_ref());
while !work.is_empty() {
let commit = work.pop().unwrap();
if parent_file_source.map_or(false, |index| index.has_id(commit.id())) {
in_parent_file.insert(commit.id().clone());
continue;
} else if !visited.insert(commit.id().clone()) {
continue;
}
work.extend(commit.parents());
work.extend(commit.predecessors());
commits.push(commit);
}
drop(visited);
// Now create the topological order with earlier commits first. If we run into
// any commits whose parents/predecessors have not all been indexed, put
// them in the map of waiting commit (keyed by the commit they're waiting
// for). Note that the order in the graph doesn't really have to be
// topological, but it seems like a useful property to have.
// Commits waiting for their parents/predecessors to be added
let mut waiting = HashMap::new();
let mut result = vec![];
let mut visited = in_parent_file;
while !commits.is_empty() {
let commit = commits.pop().unwrap();
let mut waiting_for_earlier_commit = false;
for earlier in commit.parents().iter().chain(commit.predecessors().iter()) {
if !visited.contains(earlier.id()) {
waiting
.entry(earlier.id().clone())
.or_insert_with(Vec::new)
.push(commit.clone());
waiting_for_earlier_commit = true;
break;
}
}
if !waiting_for_earlier_commit {
visited.insert(commit.id().clone());
if let Some(dependents) = waiting.remove(commit.id()) {
commits.extend(dependents);
}
result.push(commit);
}
}
assert!(waiting.is_empty());
result
}