Clean up interface to Program::perform_task()
This commit is contained in:
parent
88dd2bc220
commit
feab22026d
@ -15,7 +15,7 @@ use uuid;
|
||||
|
||||
use std::collections::{HashSet};
|
||||
use std::num::NonZeroUsize; // for describing schema versions
|
||||
use std::path::{Components, Path, PathBuf};
|
||||
use std::path::{PathBuf};
|
||||
|
||||
static MIGRATIONS: Lazy<Migrations<'static>> = Lazy::new(|| {
|
||||
Migrations::new(vec![M::up(include_str!(
|
||||
@ -77,13 +77,8 @@ pub struct SchemaUpdateResult {
|
||||
pub updated: bool,
|
||||
}
|
||||
/// Ensure that the schema in conn is current.
|
||||
///
|
||||
/// If migrate is true, migrate the schema and return.
|
||||
///
|
||||
/// If migrate is false, throw an Error if schema is not current.
|
||||
pub fn ensure_schema(
|
||||
conn: &mut Connection,
|
||||
migrate: bool,
|
||||
) -> Result<SchemaUpdateResult, SchemaError> {
|
||||
let old_version = schema_version(conn)?;
|
||||
let current_version = unsafe { NonZeroUsize::new_unchecked(CURRENT_SCHEMA_VERSION) };
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
//use jwalk;
|
||||
use log;
|
||||
use sha2::{digest::FixedOutput, Digest, Sha256};
|
||||
//use sha2::{digest::FixedOutput, Digest, Sha256};
|
||||
use std::path::Path;
|
||||
|
||||
#[derive(Debug)]
|
||||
@ -9,6 +9,6 @@ pub enum RecordError {
|
||||
}
|
||||
|
||||
pub fn record(path: &Path, message: &String) -> Result<(), RecordError> {
|
||||
log::info!("Recording path {:?} using message \"{}\"", path, message);
|
||||
log::info!("Recording path {:?} with user-provided message \"{}\"", path, message);
|
||||
Err(RecordError::NotImplemented)
|
||||
}
|
||||
|
||||
27
src/main.rs
27
src/main.rs
@ -61,13 +61,19 @@ fn main() {
|
||||
// Determine dataset dir (ds_dir)
|
||||
let mut conn = match nancy::db::find_dataset_dir(record_paths) {
|
||||
Err(nancy::db::FindDatasetError::NoDataset(path)) => {
|
||||
// initialize
|
||||
log::info!("No dataset at or above nearest ancestor path: {:?}", path);
|
||||
if !initialize {
|
||||
log::error!("Refusing to initialize a new dataset at {path:?}. \
|
||||
Pass the -i or --initialize flag to request initialization.");
|
||||
process::exit(1);
|
||||
}
|
||||
let dbpath = &path.join("nancy.db");
|
||||
log::info!("Initializing new database at {:?}", dbpath);
|
||||
let mut c = match Connection::open(dbpath) {
|
||||
Err(e) => {
|
||||
log::error!(
|
||||
"Could not open new SQLite database at {dbpath:?}."
|
||||
"Could not open new SQLite database at {dbpath:?}: {:?}",
|
||||
e
|
||||
);
|
||||
process::exit(1);
|
||||
}
|
||||
@ -91,7 +97,10 @@ fn main() {
|
||||
&mut c,
|
||||
"INIT",
|
||||
"Initialize dataset",
|
||||
|_| {
|
||||
|prog| {
|
||||
let _ = prog.perform_task(&[], |task| {
|
||||
log::debug!("INIT task UUID is {}", task.uuid);
|
||||
});
|
||||
let okres: Result<(), ()> = Ok(());
|
||||
okres
|
||||
},
|
||||
@ -123,7 +132,7 @@ fn main() {
|
||||
Ok(cc) => cc,
|
||||
};
|
||||
c.pragma_update(None, "foreign_keys", &"ON").unwrap();
|
||||
match nancy::db::ensure_schema(&mut c, true) {
|
||||
match nancy::db::ensure_schema(&mut c) {
|
||||
Err(e) => {
|
||||
log::error!("Error ensuring schema: {}", e);
|
||||
process::exit(1);
|
||||
@ -137,10 +146,12 @@ fn main() {
|
||||
};
|
||||
|
||||
if let Err(e) = nancy::program::with_program(&mut conn, "RECORD", message, |prog| {
|
||||
let dataset_path = PathBuf::from(".");
|
||||
// Note that this may fail, in which case we should roll back only this program
|
||||
// but keep the dataset initialized.
|
||||
nancy::fs::record(&dataset_path, message)
|
||||
prog.perform_task(&[], |_task| {
|
||||
let dataset_path = PathBuf::from(".");
|
||||
// Note that this may fail, in which case we should roll back only this program
|
||||
// but keep the dataset initialized.
|
||||
nancy::fs::record(&dataset_path, message)
|
||||
})
|
||||
}) {
|
||||
log::error!("Encountered error in RECORD program: {:?}", e);
|
||||
process::exit(1);
|
||||
|
||||
@ -1,31 +1,57 @@
|
||||
use log;
|
||||
use rusqlite::{Connection, Error as RSQLError};
|
||||
use rusqlite::{Connection, Error as RSQLError, Transaction};
|
||||
extern crate derive_more;
|
||||
use derive_more::From;
|
||||
use uuid::{Uuid};
|
||||
|
||||
use std::time::{Instant, SystemTime};
|
||||
|
||||
use crate::timing;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Task {
|
||||
start: Instant,
|
||||
end: Option<Instant>,
|
||||
uuid: String,
|
||||
pub uuid: Uuid,
|
||||
pub start: Instant,
|
||||
pub end: Instant,
|
||||
}
|
||||
|
||||
pub struct TaskInput<T> {
|
||||
task: Task,
|
||||
datum: T,
|
||||
#[derive(Debug)]
|
||||
pub struct TaskInput {
|
||||
pub task: Task,
|
||||
//datum: data::Datum,
|
||||
}
|
||||
|
||||
pub struct Program {}
|
||||
#[derive(Debug)]
|
||||
pub struct Program {
|
||||
pub uuid: Uuid,
|
||||
pub name: String,
|
||||
pub message: String,
|
||||
}
|
||||
impl Program {
|
||||
//fn new(conn: &Connection) -> Self {
|
||||
//Program {
|
||||
//}
|
||||
//}
|
||||
//fn new_task(self: &Program) -> Task {
|
||||
//}
|
||||
pub fn new(tx: &mut Transaction, name: &str, message: &str) -> Program {
|
||||
let u = Uuid::new_v4();
|
||||
log::debug!("New {} Program with UUID {}", name, u);
|
||||
Program {
|
||||
uuid: u,
|
||||
name: name.to_string(),
|
||||
message: message.to_string(),
|
||||
}
|
||||
}
|
||||
pub fn perform_task<R, F>(self: &Program, inputs: &[TaskInput], f: F) -> R
|
||||
where
|
||||
F: FnOnce(&Task) -> R
|
||||
{
|
||||
let start = Instant::now();
|
||||
let u = Uuid::new_v4();
|
||||
let mut task = Task {
|
||||
uuid: u,
|
||||
start: start,
|
||||
end: start,
|
||||
};
|
||||
let res = f(&task);
|
||||
task.end = Instant::now();
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@ -53,12 +79,13 @@ pub fn with_program<E, F: FnOnce(Program) -> Result<(), E>>(
|
||||
|
||||
let log_target = &format!("nancy.program ({})", name);
|
||||
|
||||
log::info!(target: log_target, "Running program {} ({})", name, message);
|
||||
log::info!(target: log_target, "Preparing program {} ({})", name, message);
|
||||
|
||||
// start transaction
|
||||
match conn.transaction() {
|
||||
let txres = conn.transaction();
|
||||
match txres {
|
||||
Err(e) => Err(ProgramError::CreateTransactionFailed(e)),
|
||||
Ok(tx) => {
|
||||
Ok(mut tx) => {
|
||||
// start timer
|
||||
log::debug!(target: log_target, "Starting timers");
|
||||
let start_st = SystemTime::now();
|
||||
@ -67,7 +94,9 @@ pub fn with_program<E, F: FnOnce(Program) -> Result<(), E>>(
|
||||
|
||||
// Instantiate Program
|
||||
// (record name and message for new program, get program ID)
|
||||
let prog = Program {};
|
||||
let prog = Program::new(&mut tx, name, message);
|
||||
|
||||
log::info!(target: log_target, "Running {:?}", prog);
|
||||
|
||||
// run closure with program argument
|
||||
f(prog)?; // if closure fails, transaction will be rolled back
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user