diff --git a/Cargo.lock b/Cargo.lock index baea1e6..a7898ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -22,6 +22,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "anyhow" +version = "1.0.66" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6" + [[package]] name = "arrayref" version = "0.3.6" @@ -562,6 +568,7 @@ dependencies = [ name = "nancy" version = "0.1.0" dependencies = [ + "anyhow", "assert_cmd", "assert_fs", "blake3", @@ -578,6 +585,7 @@ dependencies = [ "rusqlite", "rusqlite_migration", "sys-info", + "thiserror", "uuid", "whoami", ] @@ -918,6 +926,26 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95059e91184749cb66be6dc994f67f182b6d897cb3df74a5bf66b5e709295fd8" +[[package]] +name = "thiserror" +version = "1.0.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thread_local" version = "1.1.4" diff --git a/Cargo.toml b/Cargo.toml index e7ccda1..a38b80a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ assert_fs = "1.0.9" predicates = "2.1.3" [dependencies] +anyhow = "1.0.66" blake3 = "1.3.1" clap = { version = "4.0.14", features = ["derive"] } derive_more = "0.99.17" @@ -37,5 +38,6 @@ ring = "0.16.20" rusqlite = { version = "0.28.0", features = ["uuid"] } rusqlite_migration = "1.0.0" sys-info = "0.9.1" +thiserror = "1.0.37" uuid = { version = "1.2.1", features = ["v4", "v5"] } whoami = "1.2.3" diff --git a/src/db.rs b/src/db.rs index 685a757..14c19a0 100644 --- a/src/db.rs +++ b/src/db.rs @@ -3,14 +3,16 @@ //! This module is mostly for managing migrations. This does not encapsulate all database accesses. //! +use anyhow::Result; extern crate derive_more; -use derive_more::{Display, Error, From}; +use derive_more::{Display, From}; use log; use once_cell::sync::Lazy; use rusqlite; use rusqlite::Connection; use rusqlite_migration::{Error as RMError, Migrations, SchemaVersion, M}; +use thiserror::Error; use uuid; use std::collections::HashSet; @@ -26,7 +28,7 @@ static MIGRATIONS: Lazy> = Lazy::new(|| { pub static CURRENT_SCHEMA_VERSION: usize = 1; /// Error type for checking schema version of a Connection -#[derive(Debug, Display)] +#[derive(Debug, Display, Error)] pub enum SchemaError { CurrentVersionError(RMError), NoSchemaVersionSet, @@ -86,7 +88,7 @@ pub fn ensure_schema(conn: &mut Connection) -> Result Result { Ok(uuid) } -#[derive(Debug)] +#[derive(Debug, Error)] pub enum FindDatasetError { - /// An empty list of paths was provided + #[error("An empty list of paths was provided")] NoPathsProvided, - /// Something went wrong when parsing paths + #[error("Something went wrong when parsing paths")] PathError(std::io::Error), - /// None of the search paths belongs to an existing dataset. Returned path is the nearest - /// common ancestor of all searched paths (on same filesystem) + #[error("None of the search paths belongs to an existing dataset. Returned path is the nearest + common ancestor of all searched paths (on same filesystem)")] NoDataset(PathBuf), - /// Some, but not all, search paths do not reside in an existing dataset + #[error("Some, but not all, search paths do not reside in an existing dataset")] SomeNotInDataset, - /// Searched paths belong to multiple existing datasets (or some may belong to None) + #[error("Searched paths belong to multiple existing datasets (or some may belong to None)")] MultipleDatasets { datasets: Vec, some_paths_not_in_dataset: bool, }, - // TODO: REMOVE THIS - NotImplemented, } impl From for FindDatasetError { fn from(e: std::io::Error) -> FindDatasetError { @@ -142,7 +142,7 @@ pub fn find_dataset_dir(paths: &[PathBuf]) -> Result .canonicalize()?; log::debug!("First path is {:?}", first_path); - let mut common_path = first_path.to_path_buf(); + let mut common_path = first_path; log::debug!("First path as PathBuf is {:?}", common_path); let mut found_common_path = false; @@ -189,7 +189,7 @@ pub fn find_dataset_dir(paths: &[PathBuf]) -> Result log::debug!("Did not find a common path"); } - if ds_dirs.len() == 0 { + if ds_dirs.is_empty() { Err(FindDatasetError::NoDataset(common_path)) } else if ds_dirs.len() == 1 { let d = ds_dirs diff --git a/src/environment.rs b/src/environment.rs index 416cde5..f47b606 100644 --- a/src/environment.rs +++ b/src/environment.rs @@ -1,12 +1,10 @@ use machine_uid; -use rusqlite::{Error as RSError, Result as RSResult, ToSql, Transaction, types as rstypes}; +use rusqlite::{Result as RSResult, Transaction}; use sys_info; -use uuid::{Uuid}; +use uuid::Uuid; use whoami; -use std::collections::{HashMap}; -use std::env; -use std::fs::{File}; +use std::fs::File; use std::io::{Error as IOError, Read}; /// Some simple namespaces to use for deriving UUID v5 keys @@ -21,21 +19,21 @@ pub struct Machine { } impl Machine { pub fn current() -> Self { - let hostname = sys_info::hostname().unwrap_or("Unknown".to_string()); - let mid = machine_uid::get().unwrap_or("Unknown".to_string()); - let os_type = sys_info::os_type().unwrap_or("Unknown".to_string()); + let hostname = sys_info::hostname().unwrap_or_else(|_| "Unknown".to_string()); + let mid = machine_uid::get().unwrap_or_else(|_| "Unknown".to_string()); + let os_type = sys_info::os_type().unwrap_or_else(|_| "Unknown".to_string()); let key = Uuid::new_v5( &NAMESPACE_MACHINE, format!("{},{},{}", hostname, mid, os_type).as_bytes(), ); Machine { - key: key, + key, machine_id: mid, - hostname: hostname, - os_type: os_type, + hostname, + os_type, } } - pub fn record(self: &Self, tx: &Transaction) -> RSResult<()> { + pub fn record(&self, tx: &Transaction) -> RSResult<()> { tx.execute( "INSERT OR IGNORE INTO machine VALUES (?1, ?2, ?3, ?4)", ( @@ -66,14 +64,14 @@ impl User { format!("{}\n{}", username, realname).as_bytes(), ); User { - key: key, - machine: machine, - username: username, - realname: realname, + key, + machine, + username, + realname, } } - pub fn record(self: &Self, tx: &Transaction) -> RSResult<()> { - self.machine.record(tx); + pub fn record(&self, tx: &Transaction) -> RSResult<()> { + self.machine.record(tx)?; tx.execute( "INSERT OR IGNORE INTO user VALUES (?1, ?2, ?3, ?4)", ( @@ -106,9 +104,9 @@ impl Environment { f.read_to_string(&mut s)?; Ok(s) })(); - let lor = lor_result.unwrap_or("Unknown".to_string()); + let lor = lor_result.unwrap_or_else(|_| "Unknown".to_string()); let user = User::current(); - let os_release = sys_info::os_release().unwrap_or("Unknown".to_string()); + let os_release = sys_info::os_release().unwrap_or_else(|_| "Unknown".to_string()); //let env_vars = HashMap::from_iter(env::vars()); let key = Uuid::new_v5( &user.key, @@ -119,16 +117,16 @@ impl Environment { ).as_bytes(), ); Environment { - key: key, - user: user, - os_release: os_release, + key, + user, + os_release, linux_os_release: lor, //env_vars: env_vars, additional_info: addl_info, } } - pub fn record(self: &Self, tx: &Transaction) -> RSResult<()> { - self.user.record(tx); + pub fn record(&self, tx: &Transaction) -> RSResult<()> { + self.user.record(tx)?; tx.execute( "INSERT OR IGNORE INTO environment VALUES (?1, ?2, ?3, ?4, NULL, ?5)", ( diff --git a/src/fs.rs b/src/fs.rs index cdbf081..c8945f3 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -1,19 +1,21 @@ -use derive_more::{From}; -use jwalk::{Error as JWalkError, WalkDir, WalkDirGeneric}; +use anyhow::Result; +use derive_more::From; +use jwalk::{Error as JWalkError, WalkDir}; use log; use rayon::prelude::*; use ring::digest::{Context, SHA256}; use rusqlite::{Error as RSError, Result as RSResult, ToSql, Transaction, types as rstypes}; -use uuid::{Uuid}; +use thiserror::Error; +use uuid::Uuid; -use std::collections::{LinkedList}; +use std::collections::LinkedList; use std::fmt; -use std::fs::{File}; +use std::fs::File; use std::io::{BufReader, Error as IOError, Read, Result as IOResult}; use std::path::{Path, PathBuf}; use std::time::{Instant, SystemTime}; -use crate::timing::{persistent_stamp}; +use crate::timing::persistent_stamp; #[derive(Copy, Clone, Debug, PartialEq)] @@ -38,59 +40,29 @@ impl From for FileType { } impl fmt::Display for FileType { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{:?}", self); + write!(f, "{:?}", self)?; Ok(()) } } - -#[derive(Debug)] -struct FSVersion { - uuid: Uuid, - //content_sha256: [u8; 32], - //filename: String, - //parent: [u8; 32], -} -impl FSVersion { - fn from_file(path: &Path, dataset_uuid: Uuid, dataset_root: &Path) -> FSVersion { - let u = Uuid::new_v4(); - //let hasher = Sha256::new(); - FSVersion { - uuid: u, - //filename: "", - //content_sha256: hasher.finalize(), - } - } -} - - -#[derive(Debug, From)] +#[derive(Debug, Error)] pub enum RecordError { - SQLError(RSError), - IOError(IOError), - DirectoryWalkError(JWalkError), - UUIDParseError(uuid::Error), - CantGetFilename, - FilenameNotUTF8, + #[error("SQLite error: {0}")] + SQLError(#[from] RSError), + #[error("IO error: {0}")] + IOError(#[from] IOError), + #[error("error walking directory: {0}")] + DirectoryWalkError(#[from] JWalkError), + #[error("error parsing UUID: {0}")] + UUIDParseError(#[from] uuid::Error), + #[error("could not determine filename of path {0}")] + CantGetFilename(PathBuf), + #[error("path {0} could not be converted to UTF-8")] + FilenameNotUTF8(PathBuf), + #[error("parent hash was not set before computing child hash")] ParentHashNotSet, + #[error("encountered depth jump >1, indicating an error in the directory stack or query")] TooBigDepthJump, - NotImplemented, -} - -#[derive(Clone,Debug)] -struct FileInfo { - parent_sha256: Option< - [u8; 32] - >, - content_hash: [u8; 32], -} -impl Default for FileInfo { - fn default() -> Self { - FileInfo { - parent_sha256: None, - content_hash: [0; 32], - } - } } #[derive(Copy,Clone,Debug,From)] @@ -98,7 +70,7 @@ pub struct Hash256([u8; 32]); impl fmt::LowerHex for Hash256 { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { for byte in self.0 { - write!(f, "{:x}", byte); + write!(f, "{:x}", byte)?; } Ok(()) } @@ -106,7 +78,7 @@ impl fmt::LowerHex for Hash256 { impl fmt::UpperHex for Hash256 { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { for byte in self.0 { - write!(f, "{:X}", byte); + write!(f, "{:X}", byte)?; } Ok(()) } @@ -114,7 +86,7 @@ impl fmt::UpperHex for Hash256 { impl fmt::Display for Hash256 { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { for byte in self.0 { - write!(f, "{:x}", byte); + write!(f, "{:x}", byte)?; } Ok(()) } @@ -125,7 +97,7 @@ impl ToSql for Hash256 { } } -fn buffered_hash256<'a, R: Read>(mut reader: R) -> IOResult<(Hash256, usize)> { +fn buffered_hash256(mut reader: R) -> IOResult<(Hash256, usize)> { let mut ctx = Context::new(&SHA256); let mut buffer = [0; 1024 * 128]; let mut num_bytes: usize = 0; @@ -260,7 +232,7 @@ fn insert_file_content_hashes(tx: &Transaction) -> Result<(), RecordError> { let (chash, size_bytes) = buffered_hash256(reader)?; Ok((*id, chash, size_bytes, Instant::now())) } else { // should never happen - Ok((0 as i32, Hash256([0; 32]), 0, Instant::now())) + Ok((0_i32, Hash256([0; 32]), 0, Instant::now())) } }).collect(); let mut update_stmt = tx.prepare( @@ -271,22 +243,20 @@ fn insert_file_content_hashes(tx: &Transaction) -> Result<(), RecordError> { recorded_time = ?4 WHERE id = ?1")?; - for hashresult in hashes { - if let Ok((id, hash, size_bytes, recorded_instant)) = hashresult { - update_stmt.execute(( - id, - hash, - size_bytes, - persistent_stamp(recorded_instant, start_instant, start_systemtime), - ))?; - } + for (id, hash, size_bytes, recorded_instant) in hashes.into_iter().flatten() { + update_stmt.execute(( + id, + hash, + size_bytes, + persistent_stamp(recorded_instant, start_instant, start_systemtime), + ))?; } Ok(()) } /// Find latest entries in filedir_version that are not deleted, and that do not appear in /// current_files, but whose parents _do_ appear. Create new versions marking these as deleted. -fn find_deleted(tx: &Transaction, p: &Path, path_key: Hash256) -> Result<(), RecordError> { +fn find_deleted(_tx: &Transaction, _p: &Path, _path_key: Hash256) -> Result<(), RecordError> { // Do a CTE from filedir to get key and relpath for all files below p (inclusive) // For files in filedir but not in current_files, insert entries into temp table deleted_files @@ -297,7 +267,7 @@ fn find_deleted(tx: &Transaction, p: &Path, path_key: Hash256) -> Result<(), Rec } /// Remove rows from deleted_files which were previously deleted in the database -fn prune_deleted(tx: &Transaction) -> Result<(), RecordError> { +fn prune_deleted(_tx: &Transaction) -> Result<(), RecordError> { // Join filedir_version to deleted_files to get latest deleted status, then drop log::warn!("prune_deleted not yet implemented"); @@ -305,13 +275,14 @@ fn prune_deleted(tx: &Transaction) -> Result<(), RecordError> { } /// Compute directory hashes in the current_files table -fn compute_current_directory_hashes(tx: &Transaction) -> Result<(), RecordError> { +fn compute_current_directory_hashes(_tx: &Transaction) -> Result<(), RecordError> { // Extract files with a CTE on current_files, in alphabetical order. For each row, if it's not // a directory compare parent column to see whether we've changed to a new parent, in which // case we finalize this parent dir by recording its size and hash. Once a directory is // finalized, we can update its row in current_files. The recorded time of a directory should // be the max of all children recorded times. - let mut dirstack: Vec<(Hash256, usize, Context, f64)> = Vec::new(); + + //let mut dirstack: Vec<(Hash256, usize, Context, f64)> = Vec::new(); // Find all entries in current_files that declare parents that are not None and are not in // current_files. Each of these parents must be added with a new version. The content_hash must @@ -416,11 +387,12 @@ pub fn record( for a in ancestors { // hash of parent hash + filename let filename = a.file_name() - .ok_or(RecordError::CantGetFilename)? - .to_str().ok_or(RecordError::FilenameNotUTF8)?; + .ok_or_else(|| RecordError::CantGetFilename(a.to_owned()))? + .to_str() + .ok_or_else(|| RecordError::FilenameNotUTF8(a.to_owned()))?; let mut ctx = Context::new(&SHA256); ctx.update(&prev_key.0); - ctx.update(&filename.as_bytes()); + ctx.update(filename.as_bytes()); hashbuf.clone_from_slice(ctx.finish().as_ref()); prev_key = Hash256(hashbuf); } diff --git a/src/main.rs b/src/main.rs index 781bde9..3d844f0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,16 +1,18 @@ +use anyhow::{anyhow, bail, Context, Result}; use clap::{Parser, Subcommand}; use rusqlite::{Connection, OpenFlags}; -use uuid::{Uuid}; +use uuid::Uuid; use std::path::{Path, PathBuf}; -use std::process; + +use nancy::{fs, program}; // Composable provenance tracking for scientific data analysis #[derive(Parser)] #[command(author, version, about, long_about = None, arg_required_else_help=true)] struct Cli { #[command(subcommand)] - command: Option, + command: Command, } #[derive(Subcommand)] @@ -36,163 +38,104 @@ enum Command { }, /// Check for changes in dataset and print basic statistics Status {}, - /// Just say hello - Hello { - //#[arg(short, long)] - //fromlib: bool, - }, } -fn init_schema(conn: &mut Connection, name: &str) -> Uuid { - match nancy::db::init(conn, name) { - Err(e) => { - log::error!("Encountered error in initializing schema: {:?}", e); - process::exit(1); - } - Ok(dataset_uuid) => { - log::trace!("Init OK"); - log::info!("Dataset UUID is {dataset_uuid}"); - // Run an empty program so that the dataset log reflects when it was - // initialized - if let Err(e) = nancy::program::with_program( - conn, - "INIT", - "Initialize dataset", - |prog| { - let _ = prog.perform_task(&[], |task| { - log::debug!("INIT task UUID is {}", task.key); - Ok::<(), ()>(()) - }); - let okres: Result<(), ()> = Ok(()); - okres - }, - ) { - log::error!("Empty program error: {e:?}"); - process::exit(1); - } - dataset_uuid - } +fn init_schema(conn: &mut Connection, name: &str) -> Result { + let dataset_uuid = nancy::db::init(conn, name) + .map_err(|e| anyhow!("failed to initialize schema: {}", e))?; + + log::trace!("Init OK"); + log::info!("Dataset UUID is {dataset_uuid}"); + // Run an empty program so that the dataset log reflects when it was + // initialized + nancy::program::with_program( + conn, + "INIT", + "Initialize dataset", + |prog| { + let _ = prog.perform_task(&[], |task| { + log::debug!("INIT task UUID is {}", task.key); + Ok::<(), ()>(()) + }); + let okres: Result<()> = Ok(()); + okres + }, + ) + .context("Could not run empty program during init_schema")??; + + Ok(dataset_uuid) +} + +/// Run init subcommand and return the return code +fn init_cmd(name: &str, dataset_path: &Path) -> Result { + if !dataset_path.is_dir() { + bail!("Path {:?} does not point to an existing directory", dataset_path); } + let dbpath = &dataset_path.join("nancy.db"); + if dbpath.exists() { + bail!("Database {:?} exists, indicating this dataset is already \ + initialized. Refusing to overwrite.", dbpath); + } + log::info!("Initializing new database at {:?}", dbpath); + let mut conn = Connection::open(dbpath) + .with_context(|| format!("Could not open new SQLite database at {dbpath:?}"))?; + conn.pragma_update(None, "foreign_keys", &"ON") + .context("Could not set foreign_keys pragma")?; + let u = init_schema(&mut conn, name)?; + + Ok(u) } -fn do_record(conn: &mut Connection, message: &str, paths: &[PathBuf], dataset_path: &Path) -> () { - if let Err(e) = nancy::program::with_program(conn, "RECORD", message, |prog| { - prog.perform_task(&[], |task| { - // Note that this may fail, in which case we should roll back only this program - // but keep the dataset initialized. - nancy::fs::record(prog.transaction, paths, &dataset_path, message, task.key) - }) - }) { - log::error!("Encountered error in RECORD program: {:?}", e); - process::exit(1); - }; +fn record_cmd(message: &str, record_paths: &Vec) -> Result<()> { + // If no paths are given, use ["."] for the following steps. + + // Determine dataset dir (ds_dir) + let dataset_path = nancy::db::find_dataset_dir(record_paths) + .with_context(|| "Could not determine dataset directory")?; + log::info!("Found existing dataset at path: {:?}", dataset_path); + let dbpath = &dataset_path.join("nancy.db"); + + // open with flags to prevent creating when we believe the db exists + let mut conn = Connection::open_with_flags( + dbpath, + OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX, + ).context("Could not open existing SQLite database at {dbpath:?}: {e:?}")?; + + conn.pragma_update(None, "foreign_keys", &"ON")?; + + nancy::db::ensure_schema(&mut conn)?; + + // Note that recording may fail, in which case we should roll back only this program but keep + // the dataset initialized. + program::with_program(&mut conn, "RECORD", message, |prog| { + prog.perform_task(&[], |task| { + fs::record( + prog.transaction, + record_paths.as_slice(), + &dataset_path, + message, + task.key, + ) + }) + })??.map_err(|e| anyhow!("Record program failed: {}", e)) } -fn main() { +fn main() -> Result<()> { env_logger::init(); let args = Cli::parse(); match &args.command { - Some(Command::Hello {}) => { - println!("Hello from nancy (binary)!"); - match nancy::print_uuid() { - Ok(_) => { - println!("OK"); - } - Err(e) => { - println!("SQLite error: {}", e); - } - }; - } - Some(Command::Init { + Command::Init { name, dataset_path - }) => { - if !dataset_path.is_dir() { - log::error!("Path {:?} does not point to an existing directory", dataset_path); - process::exit(1); - } - let dbpath = &dataset_path.join("nancy.db"); - if dbpath.exists() { - log::error!("Database {:?} exists, indicating this dataset is already \ - initialized. Refusing to overwrite.", dbpath); - process::exit(1); - } - log::info!("Initializing new database at {:?}", dbpath); - let mut conn = match Connection::open(dbpath) { - Err(e) => { - log::error!( - "Could not open new SQLite database at {dbpath:?}: {:?}", - e - ); - process::exit(1); - } - Ok(cc) => cc, - }; - conn.pragma_update(None, "foreign_keys", &"ON").unwrap(); - let u = init_schema(&mut conn, name); - do_record( - &mut conn, - &format!("Initial recording of dataset {:?}", u), - &[dataset_path.to_path_buf()], - dataset_path, - ); - } - Some(Command::Record { + } => { init_cmd(name, dataset_path)?; }, + Command::Record { message, record_paths, - }) => { - // If no paths are given, use ["."] for the following steps. - - // Determine dataset dir (ds_dir) - match nancy::db::find_dataset_dir(record_paths) { - Err(nancy::db::FindDatasetError::NoDataset(path)) => { - log::info!("No dataset at or above nearest ancestor path: {:?}", path); - log::error!( - "Refusing to initialize a new dataset at {path:?}. \ - Use `nancy init` to create a dataset instead." - ); - process::exit(1); - } - Err(e) => { - log::error!("Could not determine dataset directory: {:?}", e); - process::exit(1); - } - Ok(path) => { - // existing - log::info!("Found existing dataset at path: {:?}", path); - let dbpath = &path.join("nancy.db"); - // open with flags to prevent creating when we believe the db exists - let mut conn = match Connection::open_with_flags( - dbpath, - OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX, - ) { - Err(e) => { - log::error!( - "Could not open existing SQLite database at {dbpath:?}: {e:?}" - ); - process::exit(1); - } - Ok(cc) => cc, - }; - conn.pragma_update(None, "foreign_keys", &"ON").unwrap(); - match nancy::db::ensure_schema(&mut conn) { - Err(e) => { - log::error!("Error ensuring schema: {}", e); - process::exit(1); - } - Ok(update_result) => { - log::debug!("Schema ensured: {:?}", update_result); - } - } - do_record(&mut conn, message, record_paths.as_slice(), &path); - } - }; - - } - Some(Command::Status {}) => { + } => record_cmd(message, record_paths)?, + Command::Status {} => { println!("status not yet implemented"); } - None => {} } + Ok(()) } diff --git a/src/program.rs b/src/program.rs index 81f14a8..6edd662 100644 --- a/src/program.rs +++ b/src/program.rs @@ -1,11 +1,13 @@ +use anyhow::Result; use log; use rusqlite::{Connection, Error as RSError, Transaction}; extern crate derive_more; +use thiserror::Error; use uuid::Uuid; use std::time::{Instant, SystemTime}; -use crate::environment::{Environment}; +use crate::environment::Environment; use crate::timing; #[derive(Debug)] @@ -13,13 +15,19 @@ pub struct Task { pub key: Uuid, } +#[derive(Debug, Error)] +pub enum TaskError { + #[error("Inserting task information into database failed: {0}")] + InsertTaskFailed(RSError), +} + #[derive(Debug)] pub struct TaskInput { pub task: Task, //datum: data::Datum, } impl Task { - pub fn new(program: &Program) -> Result { + pub fn new(program: &Program) -> Result { let key = Uuid::new_v4(); log::debug!("New Task with UUID {}", key); @@ -30,38 +38,39 @@ impl Task { key.as_bytes(), program.key.as_bytes(), ), - )?; + ).map_err(TaskError::InsertTaskFailed)?; Ok(Task { - key: key, + key, }) } } -#[derive(Debug)] -pub enum ProgramError { +#[derive(Debug, Error)] +pub enum ProgramError{ + #[error("creating transaction failed: {0}")] CreateTransactionFailed(RSError), + #[error("creating new program failed: {0}")] NewProgramFailed(RSError), - ProgramFailed(E), + #[error("recording environment failed: {0}")] RecordEnvFailed(RSError), + #[error("inserting program into database failed: {0}")] InsertProgramFailed(RSError), + #[error("recording program timestamps failed: {0}")] RecordTimestampsFailed(RSError), - NewTaskFailed(RSError), + #[error("creating new task failed: {0}")] + NewTaskFailed(TaskError), + #[error("failed commiting SQLite transaction: {0}")] CommitFailed(RSError), + #[error("performed a task while program not yet running, or after it finished")] PerformedTaskWhileNotRunning, } -impl From for ProgramError { - fn from(e: E) -> ProgramError { - ProgramError::ProgramFailed(e) - } -} #[derive(Clone,Debug,PartialEq)] enum ProgramState { Initialized, Running, Finished, - Error, } #[derive(Debug)] @@ -73,11 +82,10 @@ pub struct Program<'conn> { pub transaction: &'conn Transaction<'conn>, start_systemtime: SystemTime, start_instant: Instant, - start_stamp: f64, state: ProgramState, } impl<'conn> Program<'conn> { - pub fn new(tx: &'conn Transaction, name: &str, message: &str) -> Result, ProgramError> { + pub fn new(tx: &'conn Transaction, name: &str, message: &str) -> Result, ProgramError> { let log_target = &format!("nancy.program ({})", name); // start transaction @@ -97,20 +105,19 @@ impl<'conn> Program<'conn> { let key = Uuid::new_v4(); log::debug!("New {} Program with UUID {}", name, key); - let mut prog = Program { - key: key, + let prog = Program { + key, name: name.to_string(), message: message.to_string(), environment: Environment::current(None), transaction: tx, start_systemtime: start_st, start_instant: start, - start_stamp: start_stamp, state: ProgramState::Initialized, }; prog.environment.record(prog.transaction) - .map_err(|e| ProgramError::RecordEnvFailed(e))?; + .map_err(ProgramError::RecordEnvFailed)?; log::debug!("Environment: {:#?}", prog.environment); prog.transaction.execute( @@ -121,22 +128,24 @@ impl<'conn> Program<'conn> { name, message, ), - ).map_err(|e| ProgramError::InsertProgramFailed(e))?; + ).map_err(ProgramError::InsertProgramFailed)?; Ok(prog) } - pub fn perform_task(self: &mut Self, inputs: &[TaskInput], f: F) -> Result> + + pub fn perform_task(&mut self, _inputs: &[TaskInput], f: F) -> Result where - F: FnOnce(&mut Task) -> Result, + F: FnOnce(&Task) -> R, { if self.state != ProgramState::Running { log::error!("Performed task in state={:?}", self.state); return Err(ProgramError::PerformedTaskWhileNotRunning); } - let mut task = Task::new(self).map_err(|e| ProgramError::NewTaskFailed(e))?; - f(&mut task).map_err(|e| ProgramError::ProgramFailed(e)) + let task = Task::new(self).map_err(ProgramError::NewTaskFailed)?; + Ok(f(&task)) } - pub fn record_timestamps(self: &mut Self) -> Result<(), ProgramError> { + + pub fn record_timestamps(&mut self) -> Result<(), ProgramError> { let log_target = &format!("nancy.program ({})", self.name); // record end time @@ -154,20 +163,14 @@ impl<'conn> Program<'conn> { self.transaction.execute("UPDATE program SET end_time = ?1 WHERE key = ?2", (end_stamp, self.key.as_bytes()), - ).map_err(|e| ProgramError::RecordTimestampsFailed(e))?; + ).map_err(ProgramError::RecordTimestampsFailed)?; Ok(()) } - pub fn finish(self: &mut Self) -> Result<(), ProgramError> { - let log_target = &format!("nancy.program ({})", self.name); - - self.record_timestamps()?; - Ok(()) - } } impl<'conn> Drop for Program<'conn> { - /// Checks that the program has been cleaned up by calling the .finish() (which might) - fn drop(self: &mut Self) { + /// Checks that the program has been cleaned up by setting state to Finished + fn drop(&mut self) { if self.state != ProgramState::Finished { log::error!("Program reached destructor with unfinished state={:?}", self.state); } @@ -175,12 +178,14 @@ impl<'conn> Drop for Program<'conn> { } /// Run a closure as a program, within a database transaction -pub fn with_program Result<(), E>>( +pub fn with_program( conn: &mut Connection, name: &str, message: &str, f: F, -) -> Result<(), ProgramError> { +) -> Result, ProgramError> +where + F: FnOnce(&mut Program) -> Result { // NOTE: for Errors outside of f(prog), we should not rely on ?, so that we can keep the // From instance for ProgramError as general as possible. Instead, we should manually check // errors in the surrounding code in this function and only use ? for f(prog). @@ -203,19 +208,22 @@ pub fn with_program Result<(), E>>( prog.state = ProgramState::Running; // run closure with program argument - f(&mut prog)?; // if closure fails, transaction will be rolled back + // if closure fails, transaction will be rolled back + let ret = f(&mut prog); + // avoid committing if program failed + if ret.is_ok() { + prog.record_timestamps()?; + + // commit transaction + log::debug!(target: log_target, "Committing transaction and finalizing program"); + + prog.state = ProgramState::Finished; + log::debug!(target: log_target, "Set prog.state to Finished"); + drop(prog); // stop borrowing tx, since tx.commit() will consume tx + log::debug!(target: log_target, "Dropped prog"); + + tx.commit().map_err(|e| { ProgramError::CommitFailed(e) })?; + } - prog.record_timestamps()?; - - // commit transaction - log::debug!(target: log_target, "Committing transaction and finalizing program"); - - prog.state = ProgramState::Finished; - log::debug!(target: log_target, "Set prog.state to Finished"); - drop(prog); // stop borrowing tx, since tx.commit() will consume tx - log::debug!(target: log_target, "Dropped prog"); - - tx.commit().map_err(|e| { ProgramError::CommitFailed(e) })?; - - Ok(()) + Ok(ret) }