Format with cargo fmt

This commit is contained in:
Jacob Hinkle 2022-11-22 09:16:03 -05:00
parent 55c97a2740
commit 1badaeefb1
6 changed files with 193 additions and 160 deletions

View File

@ -112,8 +112,10 @@ pub enum FindDatasetError {
NoPathsProvided,
#[error("Something went wrong when parsing paths")]
PathError(std::io::Error),
#[error("None of the search paths belongs to an existing dataset. Returned path is the nearest
common ancestor of all searched paths (on same filesystem)")]
#[error(
"None of the search paths belongs to an existing dataset. Returned path is the nearest
common ancestor of all searched paths (on same filesystem)"
)]
NoDataset(PathBuf),
#[error("Some, but not all, search paths do not reside in an existing dataset")]
SomeNotInDataset,

View File

@ -97,8 +97,7 @@ pub struct Environment {
}
impl Environment {
pub fn current(addl_info: Option<String>) -> Self {
let lor_result: Result<String, IOError> = (
|| {
let lor_result: Result<String, IOError> = (|| {
let mut s = String::new();
let mut f = File::open("/etc/os-release")?;
f.read_to_string(&mut s)?;
@ -108,14 +107,7 @@ impl Environment {
let user = User::current();
let os_release = sys_info::os_release().unwrap_or_else(|_| "Unknown".to_string());
//let env_vars = HashMap::from_iter(env::vars());
let key = Uuid::new_v5(
&user.key,
format!(
"{}\n{}",
os_release,
lor,
).as_bytes(),
);
let key = Uuid::new_v5(&user.key, format!("{}\n{}", os_release, lor,).as_bytes());
Environment {
key,
user,

View File

@ -4,7 +4,7 @@ use jwalk::{Error as JWalkError, WalkDir};
use log;
use rayon::prelude::*;
use ring::digest::{Context, SHA256};
use rusqlite::{Error as RSError, Result as RSResult, ToSql, Transaction, types as rstypes};
use rusqlite::{types as rstypes, Error as RSError, Result as RSResult, ToSql, Transaction};
use thiserror::Error;
use uuid::Uuid;
@ -17,7 +17,6 @@ use std::time::{Instant, SystemTime};
use crate::timing::persistent_stamp;
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum FileType {
Other = 0, // char/block devices, FIFOs, sockets...
@ -114,7 +113,6 @@ fn buffered_hash256<R: Read>(mut reader: R) -> IOResult<(Hash256, usize)> {
Ok((hash.into(), num_bytes))
}
/// Walk the directory while sorting by filename, maintaining a stack of hashes (keys) for
/// directories and insert entries into current_files table as we pass over them. This means
/// computing the sha256 key, storing it along with parent, filetype, and symlink_target.
@ -122,7 +120,8 @@ fn walk_and_insert(tx: &Transaction, p: &Path, path_key: Hash256) -> Result<(),
let mut insert_stmt = tx.prepare(
"INSERT OR IGNORE INTO current_files
(sha256, name, filepath, parent, version_uuid, filetype, symlink_target, recorded_time)
VALUES(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)")?;
VALUES(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
)?;
let mut dirstack: Vec<Option<Hash256>> = vec![];
let mut parent: Option<Hash256> = None;
let mut lastkey: Option<Hash256> = None;
@ -135,13 +134,16 @@ fn walk_and_insert(tx: &Transaction, p: &Path, path_key: Hash256) -> Result<(),
for entry_result in WalkDir::new(p)
.follow_links(false)
.skip_hidden(true)
.sort(true) {
.sort(true)
{
let entry = entry_result?;
let path = entry.path();
let pathstr = path.to_str()
let pathstr = path
.to_str()
.expect("Path will be convertable to a UTF-8 string");
let filetype = FileType::from(entry.file_type());
let filename = path.file_name()
let filename = path
.file_name()
.expect("Path will end in a named component (unlike / or foo/..)")
.to_str()
.expect("File name will be convertable to a UTF-8 string");
@ -151,9 +153,11 @@ fn walk_and_insert(tx: &Transaction, p: &Path, path_key: Hash256) -> Result<(),
std::fs::read_link(&path)?
.to_str()
.expect("Symlink target will be convertable to a UTF-8 string")
.to_string()
.to_string(),
)
} else { None };
} else {
None
};
if entry.depth > depth {
// descending into directory
@ -178,7 +182,8 @@ fn walk_and_insert(tx: &Transaction, p: &Path, path_key: Hash256) -> Result<(),
Some(Hash256(b)) => {
ctx.update(&b[..]);
}
None => { // Root. hash is the hash of the uuid for this dataset
None => {
// Root. hash is the hash of the uuid for this dataset
ctx.update(&path_key.0);
}
}
@ -210,31 +215,37 @@ fn insert_file_content_hashes(tx: &Transaction) -> Result<(), RecordError> {
log::trace!("insert_file_content_hashes");
// Extract all regular files (key and path) from current_files, in arbitrary order, and compute
// content hashes for each, then update the corresponding entry in current_files.
let allpaths: Vec<RSResult<(i32, String)>> = tx.prepare(
&format!(
let allpaths: Vec<RSResult<(i32, String)>> = tx
.prepare(&format!(
"SELECT id, filepath FROM current_files WHERE filetype == {}",
FileType::Regular as u8)
)?.query_map((), |row| {
FileType::Regular as u8
))?
.query_map((), |row| {
let id = row.get::<usize, i32>(0).unwrap();
let filepath = row.get::<usize, String>(1).unwrap();
//println!("{} {}", id, filepath);
Ok((id, filepath))
}).unwrap().collect::<Vec<
RSResult<(i32, String)>>>();
})
.unwrap()
.collect::<Vec<RSResult<(i32, String)>>>();
let start_systemtime = SystemTime::now();
let start_instant = Instant::now();
let hashes: Vec<IOResult<(i32, Hash256, usize, Instant)>> = allpaths.par_iter().map(|fpres| {
let hashes: Vec<IOResult<(i32, Hash256, usize, Instant)>> = allpaths
.par_iter()
.map(|fpres| {
if let Ok((id, filepath)) = fpres {
let input = File::open(filepath)?;
let reader = BufReader::new(input);
let (chash, size_bytes) = buffered_hash256(reader)?;
Ok((*id, chash, size_bytes, Instant::now()))
} else { // should never happen
} else {
// should never happen
Ok((0_i32, Hash256([0; 32]), 0, Instant::now()))
}
}).collect();
})
.collect();
let mut update_stmt = tx.prepare(
"UPDATE current_files
SET
@ -242,7 +253,8 @@ fn insert_file_content_hashes(tx: &Transaction) -> Result<(), RecordError> {
size_bytes = ?3,
recorded_time = ?4
WHERE
id = ?1")?;
id = ?1",
)?;
for (id, hash, size_bytes, recorded_instant) in hashes.into_iter().flatten() {
update_stmt.execute((
id,
@ -294,8 +306,13 @@ fn compute_current_directory_hashes(_tx: &Transaction) -> Result<(), RecordError
}
/// Transfer rows in current_files and deleted_files into new rows in filedir_version
fn persist_temp_tables(tx: &Transaction, task_uuid: Uuid, dataset_uuid: Uuid) -> Result<(), RecordError> {
tx.execute("
fn persist_temp_tables(
tx: &Transaction,
task_uuid: Uuid,
dataset_uuid: Uuid,
) -> Result<(), RecordError> {
tx.execute(
"
INSERT OR IGNORE INTO
filedir
SELECT
@ -304,7 +321,8 @@ fn persist_temp_tables(tx: &Transaction, task_uuid: Uuid, dataset_uuid: Uuid) ->
",
(dataset_uuid.as_bytes(),),
)?;
tx.execute("
tx.execute(
"
INSERT INTO
filedir_version
SELECT
@ -336,7 +354,8 @@ pub fn record(
.query_row([], |row| {
let uuidstr: Result<String, RSError> = row.get(0);
uuidstr
})?.as_str()
})?
.as_str(),
)?;
log::debug!("Found dataset UUID: {}", u);
@ -351,7 +370,8 @@ pub fn record(
// This schema is like filedir joined with filedir_version
// TODO: revert this to a TEMP TABLE after it's debugged
tx.execute("CREATE TEMP TABLE current_files (
tx.execute(
"CREATE TEMP TABLE current_files (
id INTEGER PRIMARY KEY NOT NULL, -- only used in this table
sha256 BLOB NOT NULL, -- will become the primary key on filedir
name TEXT NOT NULL, -- filename without path
@ -365,7 +385,9 @@ pub fn record(
size_bytes INTEGER, -- null for anything but files/dirs. For dirs, the sum of children
content_sha256 BLOB,
UNIQUE(sha256) -- prevent inserting same file twice
)", [])?;
)",
[],
)?;
for p in paths {
let p = p.canonicalize()?;
@ -377,8 +399,11 @@ pub fn record(
// UUID to indicate the root directory, then taking the SHA256 of the parent plus filename,
// as we descend into subdirectories until we reach p.
let mut ancestors = LinkedList::new();
for a in p.ancestors() { // fill a stack of ancestor dirs
if a.canonicalize()? == ds_root.canonicalize()? { break };
for a in p.ancestors() {
// fill a stack of ancestor dirs
if a.canonicalize()? == ds_root.canonicalize()? {
break;
};
ancestors.push_front(a);
}
@ -386,7 +411,8 @@ pub fn record(
let mut prev_key = root_key;
for a in ancestors {
// hash of parent hash + filename
let filename = a.file_name()
let filename = a
.file_name()
.ok_or_else(|| RecordError::CantGetFilename(a.to_owned()))?
.to_str()
.ok_or_else(|| RecordError::FilenameNotUTF8(a.to_owned()))?;

View File

@ -41,26 +41,21 @@ enum Command {
}
fn init_schema(conn: &mut Connection, name: &str) -> Result<Uuid> {
let dataset_uuid = nancy::db::init(conn, name)
.map_err(|e| anyhow!("failed to initialize schema: {}", e))?;
let dataset_uuid =
nancy::db::init(conn, name).map_err(|e| anyhow!("failed to initialize schema: {}", e))?;
log::trace!("Init OK");
log::info!("Dataset UUID is {dataset_uuid}");
// Run an empty program so that the dataset log reflects when it was
// initialized
nancy::program::with_program(
conn,
"INIT",
"Initialize dataset",
|prog| {
nancy::program::with_program(conn, "INIT", "Initialize dataset", |prog| {
let _ = prog.perform_task(&[], |task| {
log::debug!("INIT task UUID is {}", task.key);
Ok::<(), ()>(())
});
let okres: Result<()> = Ok(());
okres
},
)
})
.context("Could not run empty program during init_schema")??;
Ok(dataset_uuid)
@ -69,12 +64,18 @@ fn init_schema(conn: &mut Connection, name: &str) -> Result<Uuid> {
/// Run init subcommand and return the return code
fn init_cmd(name: &str, dataset_path: &Path) -> Result<Uuid> {
if !dataset_path.is_dir() {
bail!("Path {:?} does not point to an existing directory", dataset_path);
bail!(
"Path {:?} does not point to an existing directory",
dataset_path
);
}
let dbpath = &dataset_path.join("nancy.db");
if dbpath.exists() {
bail!("Database {:?} exists, indicating this dataset is already \
initialized. Refusing to overwrite.", dbpath);
bail!(
"Database {:?} exists, indicating this dataset is already \
initialized. Refusing to overwrite.",
dbpath
);
}
log::info!("Initializing new database at {:?}", dbpath);
let mut conn = Connection::open(dbpath)
@ -99,7 +100,8 @@ fn record_cmd(message: &str, record_paths: &Vec<PathBuf>) -> Result<()> {
let mut conn = Connection::open_with_flags(
dbpath,
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX,
).context("Could not open existing SQLite database at {dbpath:?}: {e:?}")?;
)
.context("Could not open existing SQLite database at {dbpath:?}: {e:?}")?;
conn.pragma_update(None, "foreign_keys", &"ON")?;
@ -117,7 +119,8 @@ fn record_cmd(message: &str, record_paths: &Vec<PathBuf>) -> Result<()> {
task.key,
)
})
})??.map_err(|e| anyhow!("Record program failed: {}", e))
})??
.map_err(|e| anyhow!("Record program failed: {}", e))
}
fn main() -> Result<()> {
@ -125,10 +128,9 @@ fn main() -> Result<()> {
let args = Cli::parse();
match &args.command {
Command::Init {
name,
dataset_path
} => { init_cmd(name, dataset_path)?; },
Command::Init { name, dataset_path } => {
init_cmd(name, dataset_path)?;
}
Command::Record {
message,
record_paths,

View File

@ -32,17 +32,15 @@ impl Task {
log::debug!("New Task with UUID {}", key);
// NOTE: we currently do not save the "function" column
program.transaction.execute(
program
.transaction
.execute(
"INSERT INTO task VALUES (?1, ?2, NULL)",
(
key.as_bytes(),
program.key.as_bytes(),
),
).map_err(TaskError::InsertTaskFailed)?;
(key.as_bytes(), program.key.as_bytes()),
)
.map_err(TaskError::InsertTaskFailed)?;
Ok(Task {
key,
})
Ok(Task { key })
}
}
@ -85,7 +83,11 @@ pub struct Program<'conn> {
state: ProgramState,
}
impl<'conn> Program<'conn> {
pub fn new(tx: &'conn Transaction, name: &str, message: &str) -> Result<Program<'conn>, ProgramError> {
pub fn new(
tx: &'conn Transaction,
name: &str,
message: &str,
) -> Result<Program<'conn>, ProgramError> {
let log_target = &format!("nancy.program ({})", name);
// start transaction
@ -116,11 +118,13 @@ impl<'conn> Program<'conn> {
state: ProgramState::Initialized,
};
prog.environment.record(prog.transaction)
prog.environment
.record(prog.transaction)
.map_err(ProgramError::RecordEnvFailed)?;
log::debug!("Environment: {:#?}", prog.environment);
prog.transaction.execute(
prog.transaction
.execute(
"INSERT INTO program VALUES (?1, ?2, ?3, ?4, NULL, NULL)",
(
key.as_bytes(),
@ -128,7 +132,8 @@ impl<'conn> Program<'conn> {
name,
message,
),
).map_err(ProgramError::InsertProgramFailed)?;
)
.map_err(ProgramError::InsertProgramFailed)?;
Ok(prog)
}
@ -155,15 +160,14 @@ impl<'conn> Program<'conn> {
"Elapsed: {} seconds",
(end - self.start_instant).as_secs_f64()
);
let end_stamp = timing::persistent_stamp(
end,
self.start_instant,
self.start_systemtime,
);
let end_stamp = timing::persistent_stamp(end, self.start_instant, self.start_systemtime);
self.transaction.execute("UPDATE program SET end_time = ?1 WHERE key = ?2",
self.transaction
.execute(
"UPDATE program SET end_time = ?1 WHERE key = ?2",
(end_stamp, self.key.as_bytes()),
).map_err(ProgramError::RecordTimestampsFailed)?;
)
.map_err(ProgramError::RecordTimestampsFailed)?;
Ok(())
}
@ -172,7 +176,10 @@ impl<'conn> Drop for Program<'conn> {
/// Checks that the program has been cleaned up by setting state to Finished
fn drop(&mut self) {
if self.state != ProgramState::Finished {
log::error!("Program reached destructor with unfinished state={:?}", self.state);
log::error!(
"Program reached destructor with unfinished state={:?}",
self.state
);
}
}
}
@ -185,7 +192,8 @@ pub fn with_program<F, R, E>(
f: F,
) -> Result<Result<R, E>, ProgramError>
where
F: FnOnce(&mut Program) -> Result<R, E> {
F: FnOnce(&mut Program) -> Result<R, E>,
{
// NOTE: for Errors outside of f(prog), we should not rely on ?, so that we can keep the
// From instance for ProgramError as general as possible. Instead, we should manually check
// errors in the surrounding code in this function and only use ? for f(prog).
@ -199,8 +207,9 @@ where
message
);
let tx = conn.transaction()
.map_err(|e| { ProgramError::CreateTransactionFailed(e) })?;
let tx = conn
.transaction()
.map_err(|e| ProgramError::CreateTransactionFailed(e))?;
let mut prog = Program::new(&tx, name, message)?;
@ -215,14 +224,17 @@ where
prog.record_timestamps()?;
// commit transaction
log::debug!(target: log_target, "Committing transaction and finalizing program");
log::debug!(
target: log_target,
"Committing transaction and finalizing program"
);
prog.state = ProgramState::Finished;
log::debug!(target: log_target, "Set prog.state to Finished");
drop(prog); // stop borrowing tx, since tx.commit() will consume tx
log::debug!(target: log_target, "Dropped prog");
tx.commit().map_err(|e| { ProgramError::CommitFailed(e) })?;
tx.commit().map_err(|e| ProgramError::CommitFailed(e))?;
}
Ok(ret)

View File

@ -1,11 +1,11 @@
use assert_cmd::prelude::*;
use assert_fs::prelude::*;
use predicates::prelude::*;
//use assert_fs::prelude::*;
//use predicates::prelude::*;
use std::fs;
use std::io::Write;
use std::path::{Path};
use std::process::{Command};
use std::path::Path;
use std::process::Command;
fn empty_dir() -> Result<assert_fs::TempDir, assert_fs::fixture::FixtureError> {
let dir = assert_fs::TempDir::new()?;
@ -15,7 +15,8 @@ fn empty_dir() -> Result<assert_fs::TempDir, assert_fs::fixture::FixtureError> {
fn init_dir(dir: &Path) -> Result<(), assert_cmd::cargo::CargoError> {
let mut cmd = Command::cargo_bin("nancy")?;
cmd.arg("init")
.arg("--name").arg("Temporary data directory")
.arg("--name")
.arg("Temporary data directory")
.arg(dir)
.assert()
.success();
@ -28,11 +29,7 @@ fn add_some_files(dir: &Path) -> Result<(), Box<dyn std::error::Error>> {
let bazpath = subdir.join("baz.txt");
fs::create_dir_all(subdir)?;
let mut bazfile = fs::File::create(bazpath)?;
write!(
bazfile,
"{}",
"bat",
)?;
write!(bazfile, "{}", "bat",)?;
Ok(())
}
@ -41,7 +38,8 @@ fn add_some_files(dir: &Path) -> Result<(), Box<dyn std::error::Error>> {
fn init_missing_dir() -> Result<(), Box<dyn std::error::Error>> {
let mut cmd = Command::cargo_bin("nancy")?;
cmd.arg("init")
.arg("--name").arg("Missing data directory")
.arg("--name")
.arg("Missing data directory")
.arg("/path/to/missing/dir")
.assert()
.failure();
@ -49,7 +47,6 @@ fn init_missing_dir() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
#[test]
fn init_empty_dir() -> Result<(), Box<dyn std::error::Error>> {
let dir = empty_dir()?;
@ -61,8 +58,10 @@ fn init_empty_dir() -> Result<(), Box<dyn std::error::Error>> {
fn record_dir(dir: &Path) -> Result<assert_cmd::assert::Assert, assert_cmd::cargo::CargoError> {
let mut cmd = Command::cargo_bin("nancy")?;
Ok(cmd.arg("record")
.arg("--message").arg("Test recording")
Ok(cmd
.arg("record")
.arg("--message")
.arg("Test recording")
.arg(dir)
.assert()
.success())