diff --git a/flake.nix b/flake.nix index 142a869..7bf9396 100644 --- a/flake.nix +++ b/flake.nix @@ -35,6 +35,7 @@ inputsFrom = builtins.attrValues self.packages.${system}; buildInputs = [ cargo + clippy rust-analyzer clippy rustfmt # linting sqlite openssl pkgconfig # for openssl-sys dep that gets pulled in diff --git a/src/db.rs b/src/db.rs index cf7b518..e59f50a 100644 --- a/src/db.rs +++ b/src/db.rs @@ -4,29 +4,27 @@ //! extern crate derive_more; -use derive_more::{Display,Error,From}; +use derive_more::{Display, Error, From}; use once_cell::sync::Lazy; use rusqlite; use rusqlite::Connection; -use rusqlite_migration::{Error as RMError, Migrations, M, SchemaVersion}; +use rusqlite_migration::{Error as RMError, Migrations, SchemaVersion, M}; use uuid; -use std::num::{NonZeroUsize}; // for describing schema versions +use std::num::NonZeroUsize; // for describing schema versions +use std::path::{Path, PathBuf}; -// NOTE: apply pragmas outside of migrations, using e.g. -// conn.pragma_update(None, "foreign_keys", &"ON").unwrap(); -static MIGRATIONS: Lazy> = Lazy::new(|| - Migrations::new( - vec![ - M::up(include_str!("migrations/20221024_initial_schema.sql")), - ] -)); +static MIGRATIONS: Lazy> = Lazy::new(|| { + Migrations::new(vec![M::up(include_str!( + "migrations/20221024_initial_schema.sql" + ))]) +}); /// The current schema version as it will appear in the SQLite user_version pub static CURRENT_SCHEMA_VERSION: usize = 1; /// Error type for checking schema version of a Connection -#[derive(Debug,Display)] +#[derive(Debug, Display)] pub enum SchemaError { CurrentVersionError(RMError), NoSchemaVersionSet, @@ -43,17 +41,19 @@ impl From for SchemaError { pub fn schema_version(conn: &Connection) -> Result { match MIGRATIONS.current_version(conn)? { SchemaVersion::NoneSet => Err(SchemaError::NoSchemaVersionSet), - SchemaVersion::Inside(v) => // A known version is set. Check if it is current + SchemaVersion::Inside(v) => + // A known version is set. Check if it is current + { if v.get() == CURRENT_SCHEMA_VERSION { Ok(v) } else { Err(SchemaError::OldSchema(v)) - }, + } + } SchemaVersion::Outside(v) => Err(SchemaError::OutsideSchema(v)), } } - /// Initialize the database starting with an empty schema. /// /// This function initializes a database to the latest schema, and also generates a new random UUID @@ -68,7 +68,6 @@ pub fn init(conn: &mut Connection) -> Result { Ok(u) } - #[derive(Debug)] pub struct SchemaUpdateResult { pub old_version: Option, @@ -80,8 +79,10 @@ pub struct SchemaUpdateResult { /// If migrate is true, migrate the schema and return. /// /// If migrate is false, throw an Error if schema is not current. -pub fn ensure_schema(conn: &mut Connection, migrate: bool) - -> Result { +pub fn ensure_schema( + conn: &mut Connection, + migrate: bool, +) -> Result { let old_version = schema_version(conn)?; let current_version = unsafe { NonZeroUsize::new_unchecked(CURRENT_SCHEMA_VERSION) }; Ok(SchemaUpdateResult { @@ -91,14 +92,57 @@ pub fn ensure_schema(conn: &mut Connection, migrate: bool) }) } -#[derive(Debug,Display,Error,From)] +#[derive(Debug, Display, Error, From)] pub enum LocalUuidError { StatementPrepareError(rusqlite::Error), UuidError(uuid::Error), } /// Find the UUID of the dataset representing the directory containing the connected 'nancy.db' pub fn local_uuid(conn: &Connection) -> Result { - let uuid = conn.prepare("SELECT value FROM local_metadata WHERE key = 'dataset_uuid' LIMIT 1")? + let uuid = conn + .prepare("SELECT value FROM local_metadata WHERE key = 'dataset_uuid' LIMIT 1")? .query_row([], |row| row.get(0))?; Ok(uuid) } + +#[derive(Debug)] +pub enum FindDatasetResult { + /// All searched paths belong to an existing dataset at this directory + ExistingDataset(PathBuf), + /// Searched paths belong to multiple existing datasets (or some may belong to None) + MultipleDatasets(Vec>), + /// None of the search paths belongs to an existing dataset. Returned path is the nearest + /// common ancestor of all searched paths (on same filesystem) + NoDataset(PathBuf), +} + +#[derive(Debug)] +pub enum FindDatasetError { + /// The provided paths lie on different filesystems, so no common ancestor can be defined + DifferentFilesystems, + /// An empty list of paths was provided + NoPathsProvided, +} + +/// Given a collection of paths, find a common directory containing them +pub fn find_dataset_dir(paths: &[PathBuf]) -> Result { + match paths.get(0) { + None => Err(FindDatasetError::NoPathsProvided), + Some(p) => { + let mut ds_dirs: Vec = Vec::new(); + let mut ancestor = p; + + // for each canonicalized path from paths + // look at parent directories until either: + // - parent dir found in ds_dirs + // - nancy.db found + // - reached root of filesystem (no dataset found) + // also, until we've found a common ancestor, take parent of ancestor. + for p in paths { + let c = p.canonicalize(); + } + + Ok(FindDatasetResult::ExistingDataset(PathBuf::from("."))) + } + } +} diff --git a/src/fs.rs b/src/fs.rs index f76a3b9..3869e26 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -1,8 +1,7 @@ //use jwalk; use log; -use sha2::{Sha256, Digest, digest::FixedOutput}; -use std::path::{Path}; - +use sha2::{digest::FixedOutput, Digest, Sha256}; +use std::path::Path; #[derive(Debug)] pub enum RecordError { diff --git a/src/lib.rs b/src/lib.rs index 0ce237b..a57a407 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ use std::io; -use rusqlite::{Connection}; -use uuid::{Uuid}; +use rusqlite::Connection; +use uuid::Uuid; //pub mod data; pub mod db; diff --git a/src/main.rs b/src/main.rs index be264bc..7c63bb9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,8 @@ use clap::{Parser, Subcommand}; -use env_logger; -use log; -use rusqlite::{Connection}; +use rusqlite::{Connection, OpenFlags}; -use nancy; - -use std::{process}; -use std::path::{PathBuf}; +use std::path::PathBuf; +use std::process; // Composable provenance tracking for scientific data analysis #[derive(Parser)] @@ -24,7 +20,7 @@ enum Commands { /// If not within an existing dataset, create one #[arg(short, long)] initialize: bool, - /// Use + /// A short descriptive message for this recording, i.e. "Re-run with lr=1e-3" #[arg(short, long)] message: String, #[arg()] @@ -35,7 +31,7 @@ enum Commands { /// Just say hello Hello { //#[arg(short, long)] - //fromlib: bool, + //fromlib: bool, }, } @@ -49,54 +45,120 @@ fn main() { match nancy::print_uuid() { Ok(_) => { println!("OK"); - }, + } Err(e) => { println!("SQLite error: {}", e); - }, + } }; } - Some(Commands::Record { initialize, message, record_paths }) => { + Some(Commands::Record { + initialize, + message, + record_paths, + }) => { // Determine dataset dir (ds_dir) + let mut conn = match nancy::db::find_dataset_dir(record_paths) { + Err(e) => { + log::error!("Could not determine dataset directory: {:?}", e); + process::exit(1); + } + Ok(res) => { + match res { + nancy::db::FindDatasetResult::MultipleDatasets(_) => { + log::error!("Provided paths belong to multiple datasets."); + process::exit(1); + } + nancy::db::FindDatasetResult::NoDataset(path) => { + // initialize + log::info!("No dataset at or above nearest ancestor path: {:?}", path); + let dbpath = &path.join("nancy.db"); + let mut c = match Connection::open(dbpath) { + Err(e) => { + log::error!( + "Could not open new SQLite database at {dbpath:?}." + ); + process::exit(1); + } + Ok(cc) => cc, + }; + c.pragma_update(None, "foreign_keys", &"ON").unwrap(); + match nancy::db::init(&mut c) { + Err(e) => { + log::error!( + "Encountered error in initializing schema: {:?}", + e + ); + process::exit(1); + } + Ok(dataset_uuid) => { + log::trace!("Init OK"); + log::info!("Dataset UUID is {dataset_uuid}"); + // Run an empty program so that the dataset log reflects when it was + // initialized + nancy::program::with_program( + &mut c, + "INIT", + "Initialize dataset", + |_| { + let okres: Result<(), ()> = Ok(()); + okres + }, + ) + .expect("Empty program should not throw error"); + } + } + c + } + nancy::db::FindDatasetResult::ExistingDataset(path) => { + // existing + log::info!("Found existing dataset at path: {:?}", path); + let dbpath = &path.join("nancy.db"); + // open with flags to prevent creating when we believe the db exists + let mut c = match Connection::open_with_flags( + dbpath, + OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX, + ) { + Err(e) => { + log::error!( + "Could not open existing SQLite database at {dbpath:?}." + ); + process::exit(1); + } + Ok(cc) => cc, + }; + c.pragma_update(None, "foreign_keys", &"ON").unwrap(); + match nancy::db::ensure_schema(&mut c, true) { + Err(e) => { + log::error!("Error ensuring schema: {}", e); + process::exit(1); + } + Ok(update_result) => { + log::debug!("Schema ensured: {:?}", update_result); + } + } + c + } + } + } + }; // If no paths given, use ["."] for the following steps. // First, for each requested record_path, find whether it exists within a dataset dir // Ensure that all paths are in the same dataset dir, or none belong to a dataset dir // If no dataset dirs found, get top directory containing them all and set this as ds_dir let mut conn = Connection::open_in_memory().expect("Could not create in-memory db"); + // If initializing: - match nancy::db::init(&mut conn) { - Err(e) => { - log::error!("Encountered error in initializing schema: {:?}", e); - process::exit(1); - } - Ok(dataset_uuid) => { - log::trace!("Init OK"); - log::info!("Dataset UUID is {dataset_uuid}"); - nancy::program::with_program( - &mut conn, - "INIT", - "Initialize dataset", - |prog| { - let okres: Result<(), ()> = Ok(()); - okres - } - ).expect("Empty program should not throw error"); - } - } - match nancy::program::with_program(&mut conn, "RECORD", message, |prog| { + // If not initializing, ensure the schema is up to date + + if let Err(e) = nancy::program::with_program(&mut conn, "RECORD", message, |prog| { let dataset_path = PathBuf::from("."); // Note that this may fail, in which case we should roll back only this program // but keep the dataset initialized. - nancy::fs::record( - &dataset_path, - message, - ) + nancy::fs::record(&dataset_path, message) }) { - Err(e) => { - log::error!("Encountered error in RECORD program: {:?}", e); - process::exit(3); - }, - Ok(()) => (), - } + log::error!("Encountered error in RECORD program: {:?}", e); + process::exit(1); + }; } Some(Commands::Status {}) => { println!("status not yet implemented"); diff --git a/src/program.rs b/src/program.rs index 9ccbad3..420e696 100644 --- a/src/program.rs +++ b/src/program.rs @@ -1,7 +1,7 @@ use log; -use rusqlite::{Connection,Error as RSQLError}; +use rusqlite::{Connection, Error as RSQLError}; extern crate derive_more; -use derive_more::{From}; +use derive_more::From; use std::time::{Instant, SystemTime}; @@ -18,12 +18,11 @@ pub struct TaskInput { datum: T, } -pub struct Program { -} +pub struct Program {} impl Program { //fn new(conn: &Connection) -> Self { - //Program { - //} + //Program { + //} //} //fn new_task(self: &Program) -> Task { //} @@ -36,20 +35,22 @@ pub enum ProgramError { CommitFailed(RSQLError), } impl From for ProgramError { - fn from(e: E) -> ProgramError { ProgramError::ProgramFailed(e) } + fn from(e: E) -> ProgramError { + ProgramError::ProgramFailed(e) + } } /// Run a closure as a program, within a database transaction -pub fn with_program Result<(), E>>( - conn: &mut Connection, - name: &str, - message: &str, - f: F, - ) -> Result<(), ProgramError> { +pub fn with_program Result<(), E>>( + conn: &mut Connection, + name: &str, + message: &str, + f: F, +) -> Result<(), ProgramError> { // NOTE: for Errors outside of f(prog), we should not rely on ?, so that we can keep the // From instance for ProgramError as general as possible. Instead, we should manually check // errors in the surrounding code in this function and only use ? for f(prog). - + let log_target = &format!("nancy.program ({})", name); log::info!(target: log_target, "Running program {} ({})", name, message); @@ -62,10 +63,7 @@ pub fn with_program Result<(), E>>( log::debug!(target: log_target, "Starting timers"); let start_st = SystemTime::now(); let start = Instant::now(); - let start_stamp = timing::persistent_stamp( - Instant::now(), - start, - start_st); + let start_stamp = timing::persistent_stamp(Instant::now(), start, start_st); // Instantiate Program // (record name and message for new program, get program ID) @@ -73,18 +71,27 @@ pub fn with_program Result<(), E>>( // run closure with program argument f(prog)?; // if closure fails, transaction will be rolled back - + // end timer // Commit scope for RECORD program let end = Instant::now(); - log::debug!(target: log_target, "Start time: {:?} f64 timestamp={:#?}", start_st, start_stamp); - log::debug!(target: log_target, "Elapsed: {} seconds", (end - start).as_secs_f64()); + log::debug!( + target: log_target, + "Start time: {:?} f64 timestamp={:#?}", + start_st, + start_stamp + ); + log::debug!( + target: log_target, + "Elapsed: {} seconds", + (end - start).as_secs_f64() + ); // record end time // commit transaction match tx.commit() { Err(e) => Err(ProgramError::CommitFailed(e)), Ok(_) => Ok(()), } - }, + } } } diff --git a/src/timing.rs b/src/timing.rs index 263b83b..2eafb69 100644 --- a/src/timing.rs +++ b/src/timing.rs @@ -3,7 +3,8 @@ use std::time::{Instant, SystemTime, UNIX_EPOCH}; /// This converts an Instant to a double-precision float, which as of October 2022 has roughly 300 /// ns of resolution. pub fn persistent_stamp(instant: Instant, base_instant: Instant, base_systime: SystemTime) -> f64 { - base_systime.checked_add(instant - base_instant) + base_systime + .checked_add(instant - base_instant) .expect("Bounds error in std::time::SystemTime::checked_add") .duration_since(UNIX_EPOCH) .expect("Clock may have gone backward")