Start on find_dataset_dir. Format with cargo fmt

This commit is contained in:
Jacob Hinkle 2022-10-26 15:00:39 -04:00
parent 3556590f7b
commit a38bc78093
7 changed files with 204 additions and 90 deletions

View File

@ -35,6 +35,7 @@
inputsFrom = builtins.attrValues self.packages.${system};
buildInputs = [
cargo
clippy
rust-analyzer clippy rustfmt # linting
sqlite
openssl pkgconfig # for openssl-sys dep that gets pulled in

View File

@ -4,29 +4,27 @@
//!
extern crate derive_more;
use derive_more::{Display,Error,From};
use derive_more::{Display, Error, From};
use once_cell::sync::Lazy;
use rusqlite;
use rusqlite::Connection;
use rusqlite_migration::{Error as RMError, Migrations, M, SchemaVersion};
use rusqlite_migration::{Error as RMError, Migrations, SchemaVersion, M};
use uuid;
use std::num::{NonZeroUsize}; // for describing schema versions
use std::num::NonZeroUsize; // for describing schema versions
use std::path::{Path, PathBuf};
// NOTE: apply pragmas outside of migrations, using e.g.
// conn.pragma_update(None, "foreign_keys", &"ON").unwrap();
static MIGRATIONS: Lazy<Migrations<'static>> = Lazy::new(||
Migrations::new(
vec![
M::up(include_str!("migrations/20221024_initial_schema.sql")),
]
));
static MIGRATIONS: Lazy<Migrations<'static>> = Lazy::new(|| {
Migrations::new(vec![M::up(include_str!(
"migrations/20221024_initial_schema.sql"
))])
});
/// The current schema version as it will appear in the SQLite user_version
pub static CURRENT_SCHEMA_VERSION: usize = 1;
/// Error type for checking schema version of a Connection
#[derive(Debug,Display)]
#[derive(Debug, Display)]
pub enum SchemaError {
CurrentVersionError(RMError),
NoSchemaVersionSet,
@ -43,17 +41,19 @@ impl From<RMError> for SchemaError {
pub fn schema_version(conn: &Connection) -> Result<NonZeroUsize, SchemaError> {
match MIGRATIONS.current_version(conn)? {
SchemaVersion::NoneSet => Err(SchemaError::NoSchemaVersionSet),
SchemaVersion::Inside(v) => // A known version is set. Check if it is current
SchemaVersion::Inside(v) =>
// A known version is set. Check if it is current
{
if v.get() == CURRENT_SCHEMA_VERSION {
Ok(v)
} else {
Err(SchemaError::OldSchema(v))
},
}
}
SchemaVersion::Outside(v) => Err(SchemaError::OutsideSchema(v)),
}
}
/// Initialize the database starting with an empty schema.
///
/// This function initializes a database to the latest schema, and also generates a new random UUID
@ -68,7 +68,6 @@ pub fn init(conn: &mut Connection) -> Result<uuid::Uuid, RMError> {
Ok(u)
}
#[derive(Debug)]
pub struct SchemaUpdateResult {
pub old_version: Option<NonZeroUsize>,
@ -80,8 +79,10 @@ pub struct SchemaUpdateResult {
/// If migrate is true, migrate the schema and return.
///
/// If migrate is false, throw an Error if schema is not current.
pub fn ensure_schema(conn: &mut Connection, migrate: bool)
-> Result<SchemaUpdateResult, SchemaError> {
pub fn ensure_schema(
conn: &mut Connection,
migrate: bool,
) -> Result<SchemaUpdateResult, SchemaError> {
let old_version = schema_version(conn)?;
let current_version = unsafe { NonZeroUsize::new_unchecked(CURRENT_SCHEMA_VERSION) };
Ok(SchemaUpdateResult {
@ -91,14 +92,57 @@ pub fn ensure_schema(conn: &mut Connection, migrate: bool)
})
}
#[derive(Debug,Display,Error,From)]
#[derive(Debug, Display, Error, From)]
pub enum LocalUuidError {
StatementPrepareError(rusqlite::Error),
UuidError(uuid::Error),
}
/// Find the UUID of the dataset representing the directory containing the connected 'nancy.db'
pub fn local_uuid(conn: &Connection) -> Result<uuid::Uuid, LocalUuidError> {
let uuid = conn.prepare("SELECT value FROM local_metadata WHERE key = 'dataset_uuid' LIMIT 1")?
let uuid = conn
.prepare("SELECT value FROM local_metadata WHERE key = 'dataset_uuid' LIMIT 1")?
.query_row([], |row| row.get(0))?;
Ok(uuid)
}
#[derive(Debug)]
pub enum FindDatasetResult {
/// All searched paths belong to an existing dataset at this directory
ExistingDataset(PathBuf),
/// Searched paths belong to multiple existing datasets (or some may belong to None)
MultipleDatasets(Vec<Option<PathBuf>>),
/// None of the search paths belongs to an existing dataset. Returned path is the nearest
/// common ancestor of all searched paths (on same filesystem)
NoDataset(PathBuf),
}
#[derive(Debug)]
pub enum FindDatasetError {
/// The provided paths lie on different filesystems, so no common ancestor can be defined
DifferentFilesystems,
/// An empty list of paths was provided
NoPathsProvided,
}
/// Given a collection of paths, find a common directory containing them
pub fn find_dataset_dir(paths: &[PathBuf]) -> Result<FindDatasetResult, FindDatasetError> {
match paths.get(0) {
None => Err(FindDatasetError::NoPathsProvided),
Some(p) => {
let mut ds_dirs: Vec<PathBuf> = Vec::new();
let mut ancestor = p;
// for each canonicalized path from paths
// look at parent directories until either:
// - parent dir found in ds_dirs
// - nancy.db found
// - reached root of filesystem (no dataset found)
// also, until we've found a common ancestor, take parent of ancestor.
for p in paths {
let c = p.canonicalize();
}
Ok(FindDatasetResult::ExistingDataset(PathBuf::from(".")))
}
}
}

View File

@ -1,8 +1,7 @@
//use jwalk;
use log;
use sha2::{Sha256, Digest, digest::FixedOutput};
use std::path::{Path};
use sha2::{digest::FixedOutput, Digest, Sha256};
use std::path::Path;
#[derive(Debug)]
pub enum RecordError {

View File

@ -1,7 +1,7 @@
use std::io;
use rusqlite::{Connection};
use uuid::{Uuid};
use rusqlite::Connection;
use uuid::Uuid;
//pub mod data;
pub mod db;

View File

@ -1,12 +1,8 @@
use clap::{Parser, Subcommand};
use env_logger;
use log;
use rusqlite::{Connection};
use rusqlite::{Connection, OpenFlags};
use nancy;
use std::{process};
use std::path::{PathBuf};
use std::path::PathBuf;
use std::process;
// Composable provenance tracking for scientific data analysis
#[derive(Parser)]
@ -24,7 +20,7 @@ enum Commands {
/// If not within an existing dataset, create one
#[arg(short, long)]
initialize: bool,
/// Use
/// A short descriptive message for this recording, i.e. "Re-run with lr=1e-3"
#[arg(short, long)]
message: String,
#[arg()]
@ -35,7 +31,7 @@ enum Commands {
/// Just say hello
Hello {
//#[arg(short, long)]
//fromlib: bool,
//fromlib: bool,
},
}
@ -49,54 +45,120 @@ fn main() {
match nancy::print_uuid() {
Ok(_) => {
println!("OK");
},
}
Err(e) => {
println!("SQLite error: {}", e);
},
}
};
}
Some(Commands::Record { initialize, message, record_paths }) => {
Some(Commands::Record {
initialize,
message,
record_paths,
}) => {
// Determine dataset dir (ds_dir)
let mut conn = match nancy::db::find_dataset_dir(record_paths) {
Err(e) => {
log::error!("Could not determine dataset directory: {:?}", e);
process::exit(1);
}
Ok(res) => {
match res {
nancy::db::FindDatasetResult::MultipleDatasets(_) => {
log::error!("Provided paths belong to multiple datasets.");
process::exit(1);
}
nancy::db::FindDatasetResult::NoDataset(path) => {
// initialize
log::info!("No dataset at or above nearest ancestor path: {:?}", path);
let dbpath = &path.join("nancy.db");
let mut c = match Connection::open(dbpath) {
Err(e) => {
log::error!(
"Could not open new SQLite database at {dbpath:?}."
);
process::exit(1);
}
Ok(cc) => cc,
};
c.pragma_update(None, "foreign_keys", &"ON").unwrap();
match nancy::db::init(&mut c) {
Err(e) => {
log::error!(
"Encountered error in initializing schema: {:?}",
e
);
process::exit(1);
}
Ok(dataset_uuid) => {
log::trace!("Init OK");
log::info!("Dataset UUID is {dataset_uuid}");
// Run an empty program so that the dataset log reflects when it was
// initialized
nancy::program::with_program(
&mut c,
"INIT",
"Initialize dataset",
|_| {
let okres: Result<(), ()> = Ok(());
okres
},
)
.expect("Empty program should not throw error");
}
}
c
}
nancy::db::FindDatasetResult::ExistingDataset(path) => {
// existing
log::info!("Found existing dataset at path: {:?}", path);
let dbpath = &path.join("nancy.db");
// open with flags to prevent creating when we believe the db exists
let mut c = match Connection::open_with_flags(
dbpath,
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX,
) {
Err(e) => {
log::error!(
"Could not open existing SQLite database at {dbpath:?}."
);
process::exit(1);
}
Ok(cc) => cc,
};
c.pragma_update(None, "foreign_keys", &"ON").unwrap();
match nancy::db::ensure_schema(&mut c, true) {
Err(e) => {
log::error!("Error ensuring schema: {}", e);
process::exit(1);
}
Ok(update_result) => {
log::debug!("Schema ensured: {:?}", update_result);
}
}
c
}
}
}
};
// If no paths given, use ["."] for the following steps.
// First, for each requested record_path, find whether it exists within a dataset dir
// Ensure that all paths are in the same dataset dir, or none belong to a dataset dir
// If no dataset dirs found, get top directory containing them all and set this as ds_dir
let mut conn = Connection::open_in_memory().expect("Could not create in-memory db");
// If initializing:
match nancy::db::init(&mut conn) {
Err(e) => {
log::error!("Encountered error in initializing schema: {:?}", e);
process::exit(1);
}
Ok(dataset_uuid) => {
log::trace!("Init OK");
log::info!("Dataset UUID is {dataset_uuid}");
nancy::program::with_program(
&mut conn,
"INIT",
"Initialize dataset",
|prog| {
let okres: Result<(), ()> = Ok(());
okres
}
).expect("Empty program should not throw error");
}
}
match nancy::program::with_program(&mut conn, "RECORD", message, |prog| {
// If not initializing, ensure the schema is up to date
if let Err(e) = nancy::program::with_program(&mut conn, "RECORD", message, |prog| {
let dataset_path = PathBuf::from(".");
// Note that this may fail, in which case we should roll back only this program
// but keep the dataset initialized.
nancy::fs::record(
&dataset_path,
message,
)
nancy::fs::record(&dataset_path, message)
}) {
Err(e) => {
log::error!("Encountered error in RECORD program: {:?}", e);
process::exit(3);
},
Ok(()) => (),
}
log::error!("Encountered error in RECORD program: {:?}", e);
process::exit(1);
};
}
Some(Commands::Status {}) => {
println!("status not yet implemented");

View File

@ -1,7 +1,7 @@
use log;
use rusqlite::{Connection,Error as RSQLError};
use rusqlite::{Connection, Error as RSQLError};
extern crate derive_more;
use derive_more::{From};
use derive_more::From;
use std::time::{Instant, SystemTime};
@ -18,12 +18,11 @@ pub struct TaskInput<T> {
datum: T,
}
pub struct Program {
}
pub struct Program {}
impl Program {
//fn new(conn: &Connection) -> Self {
//Program {
//}
//Program {
//}
//}
//fn new_task(self: &Program) -> Task {
//}
@ -36,16 +35,18 @@ pub enum ProgramError<E> {
CommitFailed(RSQLError),
}
impl<E> From<E> for ProgramError<E> {
fn from(e: E) -> ProgramError<E> { ProgramError::ProgramFailed(e) }
fn from(e: E) -> ProgramError<E> {
ProgramError::ProgramFailed(e)
}
}
/// Run a closure as a program, within a database transaction
pub fn with_program<E, F : FnOnce(Program) -> Result<(), E>>(
conn: &mut Connection,
name: &str,
message: &str,
f: F,
) -> Result<(), ProgramError<E>> {
pub fn with_program<E, F: FnOnce(Program) -> Result<(), E>>(
conn: &mut Connection,
name: &str,
message: &str,
f: F,
) -> Result<(), ProgramError<E>> {
// NOTE: for Errors outside of f(prog), we should not rely on ?, so that we can keep the
// From instance for ProgramError as general as possible. Instead, we should manually check
// errors in the surrounding code in this function and only use ? for f(prog).
@ -62,10 +63,7 @@ pub fn with_program<E, F : FnOnce(Program) -> Result<(), E>>(
log::debug!(target: log_target, "Starting timers");
let start_st = SystemTime::now();
let start = Instant::now();
let start_stamp = timing::persistent_stamp(
Instant::now(),
start,
start_st);
let start_stamp = timing::persistent_stamp(Instant::now(), start, start_st);
// Instantiate Program
// (record name and message for new program, get program ID)
@ -77,14 +75,23 @@ pub fn with_program<E, F : FnOnce(Program) -> Result<(), E>>(
// end timer
// Commit scope for RECORD program
let end = Instant::now();
log::debug!(target: log_target, "Start time: {:?} f64 timestamp={:#?}", start_st, start_stamp);
log::debug!(target: log_target, "Elapsed: {} seconds", (end - start).as_secs_f64());
log::debug!(
target: log_target,
"Start time: {:?} f64 timestamp={:#?}",
start_st,
start_stamp
);
log::debug!(
target: log_target,
"Elapsed: {} seconds",
(end - start).as_secs_f64()
);
// record end time
// commit transaction
match tx.commit() {
Err(e) => Err(ProgramError::CommitFailed(e)),
Ok(_) => Ok(()),
}
},
}
}
}

View File

@ -3,7 +3,8 @@ use std::time::{Instant, SystemTime, UNIX_EPOCH};
/// This converts an Instant to a double-precision float, which as of October 2022 has roughly 300
/// ns of resolution.
pub fn persistent_stamp(instant: Instant, base_instant: Instant, base_systime: SystemTime) -> f64 {
base_systime.checked_add(instant - base_instant)
base_systime
.checked_add(instant - base_instant)
.expect("Bounds error in std::time::SystemTime::checked_add")
.duration_since(UNIX_EPOCH)
.expect("Clock may have gone backward")