Switch to using anyhow in main.rs and thiserror in libs

This commit is contained in:
Jacob Hinkle 2022-11-22 09:15:29 -05:00
parent 5151703dcc
commit 55c97a2740
7 changed files with 258 additions and 307 deletions

28
Cargo.lock generated
View File

@ -22,6 +22,12 @@ dependencies = [
"memchr",
]
[[package]]
name = "anyhow"
version = "1.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6"
[[package]]
name = "arrayref"
version = "0.3.6"
@ -562,6 +568,7 @@ dependencies = [
name = "nancy"
version = "0.1.0"
dependencies = [
"anyhow",
"assert_cmd",
"assert_fs",
"blake3",
@ -578,6 +585,7 @@ dependencies = [
"rusqlite",
"rusqlite_migration",
"sys-info",
"thiserror",
"uuid",
"whoami",
]
@ -918,6 +926,26 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95059e91184749cb66be6dc994f67f182b6d897cb3df74a5bf66b5e709295fd8"
[[package]]
name = "thiserror"
version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "thread_local"
version = "1.1.4"

View File

@ -24,6 +24,7 @@ assert_fs = "1.0.9"
predicates = "2.1.3"
[dependencies]
anyhow = "1.0.66"
blake3 = "1.3.1"
clap = { version = "4.0.14", features = ["derive"] }
derive_more = "0.99.17"
@ -37,5 +38,6 @@ ring = "0.16.20"
rusqlite = { version = "0.28.0", features = ["uuid"] }
rusqlite_migration = "1.0.0"
sys-info = "0.9.1"
thiserror = "1.0.37"
uuid = { version = "1.2.1", features = ["v4", "v5"] }
whoami = "1.2.3"

View File

@ -3,14 +3,16 @@
//! This module is mostly for managing migrations. This does not encapsulate all database accesses.
//!
use anyhow::Result;
extern crate derive_more;
use derive_more::{Display, Error, From};
use derive_more::{Display, From};
use log;
use once_cell::sync::Lazy;
use rusqlite;
use rusqlite::Connection;
use rusqlite_migration::{Error as RMError, Migrations, SchemaVersion, M};
use thiserror::Error;
use uuid;
use std::collections::HashSet;
@ -26,7 +28,7 @@ static MIGRATIONS: Lazy<Migrations<'static>> = Lazy::new(|| {
pub static CURRENT_SCHEMA_VERSION: usize = 1;
/// Error type for checking schema version of a Connection
#[derive(Debug, Display)]
#[derive(Debug, Display, Error)]
pub enum SchemaError {
CurrentVersionError(RMError),
NoSchemaVersionSet,
@ -86,7 +88,7 @@ pub fn ensure_schema(conn: &mut Connection) -> Result<SchemaUpdateResult, Schema
let current_version = unsafe { NonZeroUsize::new_unchecked(CURRENT_SCHEMA_VERSION) };
Ok(SchemaUpdateResult {
old_version: None,
current_version: current_version,
current_version,
updated: old_version == current_version,
})
}
@ -104,24 +106,22 @@ pub fn local_uuid(conn: &Connection) -> Result<uuid::Uuid, LocalUuidError> {
Ok(uuid)
}
#[derive(Debug)]
#[derive(Debug, Error)]
pub enum FindDatasetError {
/// An empty list of paths was provided
#[error("An empty list of paths was provided")]
NoPathsProvided,
/// Something went wrong when parsing paths
#[error("Something went wrong when parsing paths")]
PathError(std::io::Error),
/// None of the search paths belongs to an existing dataset. Returned path is the nearest
/// common ancestor of all searched paths (on same filesystem)
#[error("None of the search paths belongs to an existing dataset. Returned path is the nearest
common ancestor of all searched paths (on same filesystem)")]
NoDataset(PathBuf),
/// Some, but not all, search paths do not reside in an existing dataset
#[error("Some, but not all, search paths do not reside in an existing dataset")]
SomeNotInDataset,
/// Searched paths belong to multiple existing datasets (or some may belong to None)
#[error("Searched paths belong to multiple existing datasets (or some may belong to None)")]
MultipleDatasets {
datasets: Vec<PathBuf>,
some_paths_not_in_dataset: bool,
},
// TODO: REMOVE THIS
NotImplemented,
}
impl From<std::io::Error> for FindDatasetError {
fn from(e: std::io::Error) -> FindDatasetError {
@ -142,7 +142,7 @@ pub fn find_dataset_dir(paths: &[PathBuf]) -> Result<PathBuf, FindDatasetError>
.canonicalize()?;
log::debug!("First path is {:?}", first_path);
let mut common_path = first_path.to_path_buf();
let mut common_path = first_path;
log::debug!("First path as PathBuf is {:?}", common_path);
let mut found_common_path = false;
@ -189,7 +189,7 @@ pub fn find_dataset_dir(paths: &[PathBuf]) -> Result<PathBuf, FindDatasetError>
log::debug!("Did not find a common path");
}
if ds_dirs.len() == 0 {
if ds_dirs.is_empty() {
Err(FindDatasetError::NoDataset(common_path))
} else if ds_dirs.len() == 1 {
let d = ds_dirs

View File

@ -1,12 +1,10 @@
use machine_uid;
use rusqlite::{Error as RSError, Result as RSResult, ToSql, Transaction, types as rstypes};
use rusqlite::{Result as RSResult, Transaction};
use sys_info;
use uuid::{Uuid};
use uuid::Uuid;
use whoami;
use std::collections::{HashMap};
use std::env;
use std::fs::{File};
use std::fs::File;
use std::io::{Error as IOError, Read};
/// Some simple namespaces to use for deriving UUID v5 keys
@ -21,21 +19,21 @@ pub struct Machine {
}
impl Machine {
pub fn current() -> Self {
let hostname = sys_info::hostname().unwrap_or("Unknown".to_string());
let mid = machine_uid::get().unwrap_or("Unknown".to_string());
let os_type = sys_info::os_type().unwrap_or("Unknown".to_string());
let hostname = sys_info::hostname().unwrap_or_else(|_| "Unknown".to_string());
let mid = machine_uid::get().unwrap_or_else(|_| "Unknown".to_string());
let os_type = sys_info::os_type().unwrap_or_else(|_| "Unknown".to_string());
let key = Uuid::new_v5(
&NAMESPACE_MACHINE,
format!("{},{},{}", hostname, mid, os_type).as_bytes(),
);
Machine {
key: key,
key,
machine_id: mid,
hostname: hostname,
os_type: os_type,
hostname,
os_type,
}
}
pub fn record(self: &Self, tx: &Transaction) -> RSResult<()> {
pub fn record(&self, tx: &Transaction) -> RSResult<()> {
tx.execute(
"INSERT OR IGNORE INTO machine VALUES (?1, ?2, ?3, ?4)",
(
@ -66,14 +64,14 @@ impl User {
format!("{}\n{}", username, realname).as_bytes(),
);
User {
key: key,
machine: machine,
username: username,
realname: realname,
key,
machine,
username,
realname,
}
}
pub fn record(self: &Self, tx: &Transaction) -> RSResult<()> {
self.machine.record(tx);
pub fn record(&self, tx: &Transaction) -> RSResult<()> {
self.machine.record(tx)?;
tx.execute(
"INSERT OR IGNORE INTO user VALUES (?1, ?2, ?3, ?4)",
(
@ -106,9 +104,9 @@ impl Environment {
f.read_to_string(&mut s)?;
Ok(s)
})();
let lor = lor_result.unwrap_or("Unknown".to_string());
let lor = lor_result.unwrap_or_else(|_| "Unknown".to_string());
let user = User::current();
let os_release = sys_info::os_release().unwrap_or("Unknown".to_string());
let os_release = sys_info::os_release().unwrap_or_else(|_| "Unknown".to_string());
//let env_vars = HashMap::from_iter(env::vars());
let key = Uuid::new_v5(
&user.key,
@ -119,16 +117,16 @@ impl Environment {
).as_bytes(),
);
Environment {
key: key,
user: user,
os_release: os_release,
key,
user,
os_release,
linux_os_release: lor,
//env_vars: env_vars,
additional_info: addl_info,
}
}
pub fn record(self: &Self, tx: &Transaction) -> RSResult<()> {
self.user.record(tx);
pub fn record(&self, tx: &Transaction) -> RSResult<()> {
self.user.record(tx)?;
tx.execute(
"INSERT OR IGNORE INTO environment VALUES (?1, ?2, ?3, ?4, NULL, ?5)",
(

118
src/fs.rs
View File

@ -1,19 +1,21 @@
use derive_more::{From};
use jwalk::{Error as JWalkError, WalkDir, WalkDirGeneric};
use anyhow::Result;
use derive_more::From;
use jwalk::{Error as JWalkError, WalkDir};
use log;
use rayon::prelude::*;
use ring::digest::{Context, SHA256};
use rusqlite::{Error as RSError, Result as RSResult, ToSql, Transaction, types as rstypes};
use uuid::{Uuid};
use thiserror::Error;
use uuid::Uuid;
use std::collections::{LinkedList};
use std::collections::LinkedList;
use std::fmt;
use std::fs::{File};
use std::fs::File;
use std::io::{BufReader, Error as IOError, Read, Result as IOResult};
use std::path::{Path, PathBuf};
use std::time::{Instant, SystemTime};
use crate::timing::{persistent_stamp};
use crate::timing::persistent_stamp;
#[derive(Copy, Clone, Debug, PartialEq)]
@ -38,59 +40,29 @@ impl From<std::fs::FileType> for FileType {
}
impl fmt::Display for FileType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self);
write!(f, "{:?}", self)?;
Ok(())
}
}
#[derive(Debug)]
struct FSVersion {
uuid: Uuid,
//content_sha256: [u8; 32],
//filename: String,
//parent: [u8; 32],
}
impl FSVersion {
fn from_file(path: &Path, dataset_uuid: Uuid, dataset_root: &Path) -> FSVersion {
let u = Uuid::new_v4();
//let hasher = Sha256::new();
FSVersion {
uuid: u,
//filename: "",
//content_sha256: hasher.finalize(),
}
}
}
#[derive(Debug, From)]
#[derive(Debug, Error)]
pub enum RecordError {
SQLError(RSError),
IOError(IOError),
DirectoryWalkError(JWalkError),
UUIDParseError(uuid::Error),
CantGetFilename,
FilenameNotUTF8,
#[error("SQLite error: {0}")]
SQLError(#[from] RSError),
#[error("IO error: {0}")]
IOError(#[from] IOError),
#[error("error walking directory: {0}")]
DirectoryWalkError(#[from] JWalkError),
#[error("error parsing UUID: {0}")]
UUIDParseError(#[from] uuid::Error),
#[error("could not determine filename of path {0}")]
CantGetFilename(PathBuf),
#[error("path {0} could not be converted to UTF-8")]
FilenameNotUTF8(PathBuf),
#[error("parent hash was not set before computing child hash")]
ParentHashNotSet,
#[error("encountered depth jump >1, indicating an error in the directory stack or query")]
TooBigDepthJump,
NotImplemented,
}
#[derive(Clone,Debug)]
struct FileInfo {
parent_sha256: Option<
[u8; 32]
>,
content_hash: [u8; 32],
}
impl Default for FileInfo {
fn default() -> Self {
FileInfo {
parent_sha256: None,
content_hash: [0; 32],
}
}
}
#[derive(Copy,Clone,Debug,From)]
@ -98,7 +70,7 @@ pub struct Hash256([u8; 32]);
impl fmt::LowerHex for Hash256 {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
for byte in self.0 {
write!(f, "{:x}", byte);
write!(f, "{:x}", byte)?;
}
Ok(())
}
@ -106,7 +78,7 @@ impl fmt::LowerHex for Hash256 {
impl fmt::UpperHex for Hash256 {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
for byte in self.0 {
write!(f, "{:X}", byte);
write!(f, "{:X}", byte)?;
}
Ok(())
}
@ -114,7 +86,7 @@ impl fmt::UpperHex for Hash256 {
impl fmt::Display for Hash256 {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
for byte in self.0 {
write!(f, "{:x}", byte);
write!(f, "{:x}", byte)?;
}
Ok(())
}
@ -125,7 +97,7 @@ impl ToSql for Hash256 {
}
}
fn buffered_hash256<'a, R: Read>(mut reader: R) -> IOResult<(Hash256, usize)> {
fn buffered_hash256<R: Read>(mut reader: R) -> IOResult<(Hash256, usize)> {
let mut ctx = Context::new(&SHA256);
let mut buffer = [0; 1024 * 128];
let mut num_bytes: usize = 0;
@ -260,7 +232,7 @@ fn insert_file_content_hashes(tx: &Transaction) -> Result<(), RecordError> {
let (chash, size_bytes) = buffered_hash256(reader)?;
Ok((*id, chash, size_bytes, Instant::now()))
} else { // should never happen
Ok((0 as i32, Hash256([0; 32]), 0, Instant::now()))
Ok((0_i32, Hash256([0; 32]), 0, Instant::now()))
}
}).collect();
let mut update_stmt = tx.prepare(
@ -271,22 +243,20 @@ fn insert_file_content_hashes(tx: &Transaction) -> Result<(), RecordError> {
recorded_time = ?4
WHERE
id = ?1")?;
for hashresult in hashes {
if let Ok((id, hash, size_bytes, recorded_instant)) = hashresult {
update_stmt.execute((
id,
hash,
size_bytes,
persistent_stamp(recorded_instant, start_instant, start_systemtime),
))?;
}
for (id, hash, size_bytes, recorded_instant) in hashes.into_iter().flatten() {
update_stmt.execute((
id,
hash,
size_bytes,
persistent_stamp(recorded_instant, start_instant, start_systemtime),
))?;
}
Ok(())
}
/// Find latest entries in filedir_version that are not deleted, and that do not appear in
/// current_files, but whose parents _do_ appear. Create new versions marking these as deleted.
fn find_deleted(tx: &Transaction, p: &Path, path_key: Hash256) -> Result<(), RecordError> {
fn find_deleted(_tx: &Transaction, _p: &Path, _path_key: Hash256) -> Result<(), RecordError> {
// Do a CTE from filedir to get key and relpath for all files below p (inclusive)
// For files in filedir but not in current_files, insert entries into temp table deleted_files
@ -297,7 +267,7 @@ fn find_deleted(tx: &Transaction, p: &Path, path_key: Hash256) -> Result<(), Rec
}
/// Remove rows from deleted_files which were previously deleted in the database
fn prune_deleted(tx: &Transaction) -> Result<(), RecordError> {
fn prune_deleted(_tx: &Transaction) -> Result<(), RecordError> {
// Join filedir_version to deleted_files to get latest deleted status, then drop
log::warn!("prune_deleted not yet implemented");
@ -305,13 +275,14 @@ fn prune_deleted(tx: &Transaction) -> Result<(), RecordError> {
}
/// Compute directory hashes in the current_files table
fn compute_current_directory_hashes(tx: &Transaction) -> Result<(), RecordError> {
fn compute_current_directory_hashes(_tx: &Transaction) -> Result<(), RecordError> {
// Extract files with a CTE on current_files, in alphabetical order. For each row, if it's not
// a directory compare parent column to see whether we've changed to a new parent, in which
// case we finalize this parent dir by recording its size and hash. Once a directory is
// finalized, we can update its row in current_files. The recorded time of a directory should
// be the max of all children recorded times.
let mut dirstack: Vec<(Hash256, usize, Context, f64)> = Vec::new();
//let mut dirstack: Vec<(Hash256, usize, Context, f64)> = Vec::new();
// Find all entries in current_files that declare parents that are not None and are not in
// current_files. Each of these parents must be added with a new version. The content_hash must
@ -416,11 +387,12 @@ pub fn record(
for a in ancestors {
// hash of parent hash + filename
let filename = a.file_name()
.ok_or(RecordError::CantGetFilename)?
.to_str().ok_or(RecordError::FilenameNotUTF8)?;
.ok_or_else(|| RecordError::CantGetFilename(a.to_owned()))?
.to_str()
.ok_or_else(|| RecordError::FilenameNotUTF8(a.to_owned()))?;
let mut ctx = Context::new(&SHA256);
ctx.update(&prev_key.0);
ctx.update(&filename.as_bytes());
ctx.update(filename.as_bytes());
hashbuf.clone_from_slice(ctx.finish().as_ref());
prev_key = Hash256(hashbuf);
}

View File

@ -1,16 +1,18 @@
use anyhow::{anyhow, bail, Context, Result};
use clap::{Parser, Subcommand};
use rusqlite::{Connection, OpenFlags};
use uuid::{Uuid};
use uuid::Uuid;
use std::path::{Path, PathBuf};
use std::process;
use nancy::{fs, program};
// Composable provenance tracking for scientific data analysis
#[derive(Parser)]
#[command(author, version, about, long_about = None, arg_required_else_help=true)]
struct Cli {
#[command(subcommand)]
command: Option<Command>,
command: Command,
}
#[derive(Subcommand)]
@ -36,163 +38,104 @@ enum Command {
},
/// Check for changes in dataset and print basic statistics
Status {},
/// Just say hello
Hello {
//#[arg(short, long)]
//fromlib: bool,
},
}
fn init_schema(conn: &mut Connection, name: &str) -> Uuid {
match nancy::db::init(conn, name) {
Err(e) => {
log::error!("Encountered error in initializing schema: {:?}", e);
process::exit(1);
}
Ok(dataset_uuid) => {
log::trace!("Init OK");
log::info!("Dataset UUID is {dataset_uuid}");
// Run an empty program so that the dataset log reflects when it was
// initialized
if let Err(e) = nancy::program::with_program(
conn,
"INIT",
"Initialize dataset",
|prog| {
let _ = prog.perform_task(&[], |task| {
log::debug!("INIT task UUID is {}", task.key);
Ok::<(), ()>(())
});
let okres: Result<(), ()> = Ok(());
okres
},
) {
log::error!("Empty program error: {e:?}");
process::exit(1);
}
dataset_uuid
}
fn init_schema(conn: &mut Connection, name: &str) -> Result<Uuid> {
let dataset_uuid = nancy::db::init(conn, name)
.map_err(|e| anyhow!("failed to initialize schema: {}", e))?;
log::trace!("Init OK");
log::info!("Dataset UUID is {dataset_uuid}");
// Run an empty program so that the dataset log reflects when it was
// initialized
nancy::program::with_program(
conn,
"INIT",
"Initialize dataset",
|prog| {
let _ = prog.perform_task(&[], |task| {
log::debug!("INIT task UUID is {}", task.key);
Ok::<(), ()>(())
});
let okres: Result<()> = Ok(());
okres
},
)
.context("Could not run empty program during init_schema")??;
Ok(dataset_uuid)
}
/// Run init subcommand and return the return code
fn init_cmd(name: &str, dataset_path: &Path) -> Result<Uuid> {
if !dataset_path.is_dir() {
bail!("Path {:?} does not point to an existing directory", dataset_path);
}
let dbpath = &dataset_path.join("nancy.db");
if dbpath.exists() {
bail!("Database {:?} exists, indicating this dataset is already \
initialized. Refusing to overwrite.", dbpath);
}
log::info!("Initializing new database at {:?}", dbpath);
let mut conn = Connection::open(dbpath)
.with_context(|| format!("Could not open new SQLite database at {dbpath:?}"))?;
conn.pragma_update(None, "foreign_keys", &"ON")
.context("Could not set foreign_keys pragma")?;
let u = init_schema(&mut conn, name)?;
Ok(u)
}
fn do_record(conn: &mut Connection, message: &str, paths: &[PathBuf], dataset_path: &Path) -> () {
if let Err(e) = nancy::program::with_program(conn, "RECORD", message, |prog| {
prog.perform_task(&[], |task| {
// Note that this may fail, in which case we should roll back only this program
// but keep the dataset initialized.
nancy::fs::record(prog.transaction, paths, &dataset_path, message, task.key)
})
}) {
log::error!("Encountered error in RECORD program: {:?}", e);
process::exit(1);
};
fn record_cmd(message: &str, record_paths: &Vec<PathBuf>) -> Result<()> {
// If no paths are given, use ["."] for the following steps.
// Determine dataset dir (ds_dir)
let dataset_path = nancy::db::find_dataset_dir(record_paths)
.with_context(|| "Could not determine dataset directory")?;
log::info!("Found existing dataset at path: {:?}", dataset_path);
let dbpath = &dataset_path.join("nancy.db");
// open with flags to prevent creating when we believe the db exists
let mut conn = Connection::open_with_flags(
dbpath,
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX,
).context("Could not open existing SQLite database at {dbpath:?}: {e:?}")?;
conn.pragma_update(None, "foreign_keys", &"ON")?;
nancy::db::ensure_schema(&mut conn)?;
// Note that recording may fail, in which case we should roll back only this program but keep
// the dataset initialized.
program::with_program(&mut conn, "RECORD", message, |prog| {
prog.perform_task(&[], |task| {
fs::record(
prog.transaction,
record_paths.as_slice(),
&dataset_path,
message,
task.key,
)
})
})??.map_err(|e| anyhow!("Record program failed: {}", e))
}
fn main() {
fn main() -> Result<()> {
env_logger::init();
let args = Cli::parse();
match &args.command {
Some(Command::Hello {}) => {
println!("Hello from nancy (binary)!");
match nancy::print_uuid() {
Ok(_) => {
println!("OK");
}
Err(e) => {
println!("SQLite error: {}", e);
}
};
}
Some(Command::Init {
Command::Init {
name,
dataset_path
}) => {
if !dataset_path.is_dir() {
log::error!("Path {:?} does not point to an existing directory", dataset_path);
process::exit(1);
}
let dbpath = &dataset_path.join("nancy.db");
if dbpath.exists() {
log::error!("Database {:?} exists, indicating this dataset is already \
initialized. Refusing to overwrite.", dbpath);
process::exit(1);
}
log::info!("Initializing new database at {:?}", dbpath);
let mut conn = match Connection::open(dbpath) {
Err(e) => {
log::error!(
"Could not open new SQLite database at {dbpath:?}: {:?}",
e
);
process::exit(1);
}
Ok(cc) => cc,
};
conn.pragma_update(None, "foreign_keys", &"ON").unwrap();
let u = init_schema(&mut conn, name);
do_record(
&mut conn,
&format!("Initial recording of dataset {:?}", u),
&[dataset_path.to_path_buf()],
dataset_path,
);
}
Some(Command::Record {
} => { init_cmd(name, dataset_path)?; },
Command::Record {
message,
record_paths,
}) => {
// If no paths are given, use ["."] for the following steps.
// Determine dataset dir (ds_dir)
match nancy::db::find_dataset_dir(record_paths) {
Err(nancy::db::FindDatasetError::NoDataset(path)) => {
log::info!("No dataset at or above nearest ancestor path: {:?}", path);
log::error!(
"Refusing to initialize a new dataset at {path:?}. \
Use `nancy init` to create a dataset instead."
);
process::exit(1);
}
Err(e) => {
log::error!("Could not determine dataset directory: {:?}", e);
process::exit(1);
}
Ok(path) => {
// existing
log::info!("Found existing dataset at path: {:?}", path);
let dbpath = &path.join("nancy.db");
// open with flags to prevent creating when we believe the db exists
let mut conn = match Connection::open_with_flags(
dbpath,
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX,
) {
Err(e) => {
log::error!(
"Could not open existing SQLite database at {dbpath:?}: {e:?}"
);
process::exit(1);
}
Ok(cc) => cc,
};
conn.pragma_update(None, "foreign_keys", &"ON").unwrap();
match nancy::db::ensure_schema(&mut conn) {
Err(e) => {
log::error!("Error ensuring schema: {}", e);
process::exit(1);
}
Ok(update_result) => {
log::debug!("Schema ensured: {:?}", update_result);
}
}
do_record(&mut conn, message, record_paths.as_slice(), &path);
}
};
}
Some(Command::Status {}) => {
} => record_cmd(message, record_paths)?,
Command::Status {} => {
println!("status not yet implemented");
}
None => {}
}
Ok(())
}

View File

@ -1,11 +1,13 @@
use anyhow::Result;
use log;
use rusqlite::{Connection, Error as RSError, Transaction};
extern crate derive_more;
use thiserror::Error;
use uuid::Uuid;
use std::time::{Instant, SystemTime};
use crate::environment::{Environment};
use crate::environment::Environment;
use crate::timing;
#[derive(Debug)]
@ -13,13 +15,19 @@ pub struct Task {
pub key: Uuid,
}
#[derive(Debug, Error)]
pub enum TaskError {
#[error("Inserting task information into database failed: {0}")]
InsertTaskFailed(RSError),
}
#[derive(Debug)]
pub struct TaskInput {
pub task: Task,
//datum: data::Datum,
}
impl Task {
pub fn new(program: &Program) -> Result<Task, RSError> {
pub fn new(program: &Program) -> Result<Task, TaskError> {
let key = Uuid::new_v4();
log::debug!("New Task with UUID {}", key);
@ -30,38 +38,39 @@ impl Task {
key.as_bytes(),
program.key.as_bytes(),
),
)?;
).map_err(TaskError::InsertTaskFailed)?;
Ok(Task {
key: key,
key,
})
}
}
#[derive(Debug)]
pub enum ProgramError<E> {
#[derive(Debug, Error)]
pub enum ProgramError{
#[error("creating transaction failed: {0}")]
CreateTransactionFailed(RSError),
#[error("creating new program failed: {0}")]
NewProgramFailed(RSError),
ProgramFailed(E),
#[error("recording environment failed: {0}")]
RecordEnvFailed(RSError),
#[error("inserting program into database failed: {0}")]
InsertProgramFailed(RSError),
#[error("recording program timestamps failed: {0}")]
RecordTimestampsFailed(RSError),
NewTaskFailed(RSError),
#[error("creating new task failed: {0}")]
NewTaskFailed(TaskError),
#[error("failed commiting SQLite transaction: {0}")]
CommitFailed(RSError),
#[error("performed a task while program not yet running, or after it finished")]
PerformedTaskWhileNotRunning,
}
impl<E> From<E> for ProgramError<E> {
fn from(e: E) -> ProgramError<E> {
ProgramError::ProgramFailed(e)
}
}
#[derive(Clone,Debug,PartialEq)]
enum ProgramState {
Initialized,
Running,
Finished,
Error,
}
#[derive(Debug)]
@ -73,11 +82,10 @@ pub struct Program<'conn> {
pub transaction: &'conn Transaction<'conn>,
start_systemtime: SystemTime,
start_instant: Instant,
start_stamp: f64,
state: ProgramState,
}
impl<'conn> Program<'conn> {
pub fn new<E>(tx: &'conn Transaction, name: &str, message: &str) -> Result<Program<'conn>, ProgramError<E>> {
pub fn new(tx: &'conn Transaction, name: &str, message: &str) -> Result<Program<'conn>, ProgramError> {
let log_target = &format!("nancy.program ({})", name);
// start transaction
@ -97,20 +105,19 @@ impl<'conn> Program<'conn> {
let key = Uuid::new_v4();
log::debug!("New {} Program with UUID {}", name, key);
let mut prog = Program {
key: key,
let prog = Program {
key,
name: name.to_string(),
message: message.to_string(),
environment: Environment::current(None),
transaction: tx,
start_systemtime: start_st,
start_instant: start,
start_stamp: start_stamp,
state: ProgramState::Initialized,
};
prog.environment.record(prog.transaction)
.map_err(|e| ProgramError::RecordEnvFailed(e))?;
.map_err(ProgramError::RecordEnvFailed)?;
log::debug!("Environment: {:#?}", prog.environment);
prog.transaction.execute(
@ -121,22 +128,24 @@ impl<'conn> Program<'conn> {
name,
message,
),
).map_err(|e| ProgramError::InsertProgramFailed(e))?;
).map_err(ProgramError::InsertProgramFailed)?;
Ok(prog)
}
pub fn perform_task<E, R, F>(self: &mut Self, inputs: &[TaskInput], f: F) -> Result<R, ProgramError<E>>
pub fn perform_task<R, F>(&mut self, _inputs: &[TaskInput], f: F) -> Result<R, ProgramError>
where
F: FnOnce(&mut Task) -> Result<R, E>,
F: FnOnce(&Task) -> R,
{
if self.state != ProgramState::Running {
log::error!("Performed task in state={:?}", self.state);
return Err(ProgramError::PerformedTaskWhileNotRunning);
}
let mut task = Task::new(self).map_err(|e| ProgramError::NewTaskFailed(e))?;
f(&mut task).map_err(|e| ProgramError::ProgramFailed(e))
let task = Task::new(self).map_err(ProgramError::NewTaskFailed)?;
Ok(f(&task))
}
pub fn record_timestamps<E>(self: &mut Self) -> Result<(), ProgramError<E>> {
pub fn record_timestamps(&mut self) -> Result<(), ProgramError> {
let log_target = &format!("nancy.program ({})", self.name);
// record end time
@ -154,20 +163,14 @@ impl<'conn> Program<'conn> {
self.transaction.execute("UPDATE program SET end_time = ?1 WHERE key = ?2",
(end_stamp, self.key.as_bytes()),
).map_err(|e| ProgramError::RecordTimestampsFailed(e))?;
).map_err(ProgramError::RecordTimestampsFailed)?;
Ok(())
}
pub fn finish<E>(self: &mut Self) -> Result<(), ProgramError<E>> {
let log_target = &format!("nancy.program ({})", self.name);
self.record_timestamps()?;
Ok(())
}
}
impl<'conn> Drop for Program<'conn> {
/// Checks that the program has been cleaned up by calling the .finish() (which might)
fn drop(self: &mut Self) {
/// Checks that the program has been cleaned up by setting state to Finished
fn drop(&mut self) {
if self.state != ProgramState::Finished {
log::error!("Program reached destructor with unfinished state={:?}", self.state);
}
@ -175,12 +178,14 @@ impl<'conn> Drop for Program<'conn> {
}
/// Run a closure as a program, within a database transaction
pub fn with_program<E, F: FnOnce(&mut Program) -> Result<(), E>>(
pub fn with_program<F, R, E>(
conn: &mut Connection,
name: &str,
message: &str,
f: F,
) -> Result<(), ProgramError<E>> {
) -> Result<Result<R, E>, ProgramError>
where
F: FnOnce(&mut Program) -> Result<R, E> {
// NOTE: for Errors outside of f(prog), we should not rely on ?, so that we can keep the
// From instance for ProgramError as general as possible. Instead, we should manually check
// errors in the surrounding code in this function and only use ? for f(prog).
@ -203,19 +208,22 @@ pub fn with_program<E, F: FnOnce(&mut Program) -> Result<(), E>>(
prog.state = ProgramState::Running;
// run closure with program argument
f(&mut prog)?; // if closure fails, transaction will be rolled back
// if closure fails, transaction will be rolled back
let ret = f(&mut prog);
// avoid committing if program failed
if ret.is_ok() {
prog.record_timestamps()?;
prog.record_timestamps()?;
// commit transaction
log::debug!(target: log_target, "Committing transaction and finalizing program");
// commit transaction
log::debug!(target: log_target, "Committing transaction and finalizing program");
prog.state = ProgramState::Finished;
log::debug!(target: log_target, "Set prog.state to Finished");
drop(prog); // stop borrowing tx, since tx.commit() will consume tx
log::debug!(target: log_target, "Dropped prog");
prog.state = ProgramState::Finished;
log::debug!(target: log_target, "Set prog.state to Finished");
drop(prog); // stop borrowing tx, since tx.commit() will consume tx
log::debug!(target: log_target, "Dropped prog");
tx.commit().map_err(|e| { ProgramError::CommitFailed(e) })?;
}
tx.commit().map_err(|e| { ProgramError::CommitFailed(e) })?;
Ok(())
Ok(ret)
}