diff --git a/src/environment.rs b/src/environment.rs index 6b4a28d..416cde5 100644 --- a/src/environment.rs +++ b/src/environment.rs @@ -12,7 +12,7 @@ use std::io::{Error as IOError, Read}; /// Some simple namespaces to use for deriving UUID v5 keys const NAMESPACE_MACHINE: Uuid = Uuid::from_bytes([1; 16]); -#[derive(Debug)] +#[derive(Clone,Debug)] pub struct Machine { pub key: Uuid, pub machine_id: String, @@ -35,7 +35,7 @@ impl Machine { os_type: os_type, } } - pub fn record(self: &Self, tx: &mut Transaction) -> RSResult<()> { + pub fn record(self: &Self, tx: &Transaction) -> RSResult<()> { tx.execute( "INSERT OR IGNORE INTO machine VALUES (?1, ?2, ?3, ?4)", ( @@ -49,7 +49,7 @@ impl Machine { } } -#[derive(Debug)] +#[derive(Clone,Debug)] pub struct User { pub key: Uuid, pub machine: Machine, @@ -72,7 +72,7 @@ impl User { realname: realname, } } - pub fn record(self: &Self, tx: &mut Transaction) -> RSResult<()> { + pub fn record(self: &Self, tx: &Transaction) -> RSResult<()> { self.machine.record(tx); tx.execute( "INSERT OR IGNORE INTO user VALUES (?1, ?2, ?3, ?4)", @@ -87,7 +87,7 @@ impl User { } } -#[derive(Debug)] +#[derive(Clone,Debug)] pub struct Environment { pub key: Uuid, pub user: User, @@ -127,9 +127,7 @@ impl Environment { additional_info: addl_info, } } - //fn load(tx: &Transaction, key: Uuid) -> Self { - //} - pub fn record(self: &Self, tx: &mut Transaction) -> RSResult<()> { + pub fn record(self: &Self, tx: &Transaction) -> RSResult<()> { self.user.record(tx); tx.execute( "INSERT OR IGNORE INTO environment VALUES (?1, ?2, ?3, ?4, NULL, ?5)", diff --git a/src/fs.rs b/src/fs.rs index 39b957b..01eb8d2 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -146,7 +146,7 @@ fn buffered_hash256<'a, R: Read>(mut reader: R) -> IOResult<(Hash256, usize)> { /// Walk the directory while sorting by filename, maintaining a stack of hashes (keys) for /// directories and insert entries into current_files table as we pass over them. This means /// computing the sha256 key, storing it along with parent, filetype, and symlink_target. -fn walk_and_insert(tx: &mut Transaction, p: &Path, path_key: Hash256) -> Result<(), RecordError> { +fn walk_and_insert(tx: &Transaction, p: &Path, path_key: Hash256) -> Result<(), RecordError> { let mut insert_stmt = tx.prepare( "INSERT OR IGNORE INTO current_files (sha256, name, filepath, parent, version_uuid, filetype, symlink_target, recorded_time) @@ -234,7 +234,7 @@ fn walk_and_insert(tx: &mut Transaction, p: &Path, path_key: Hash256) -> Result< /// Compute content hashes for all files in the current_files temp table. Symlinks and "Other" /// types will have NULL hashes, and directory hashes will be computed in a separate step. -fn insert_file_content_hashes(tx: &mut Transaction) -> Result<(), RecordError> { +fn insert_file_content_hashes(tx: &Transaction) -> Result<(), RecordError> { log::trace!("insert_file_content_hashes"); // Extract all regular files (key and path) from current_files, in arbitrary order, and compute // content hashes for each, then update the corresponding entry in current_files. @@ -285,7 +285,7 @@ fn insert_file_content_hashes(tx: &mut Transaction) -> Result<(), RecordError> { } pub fn record( - tx: &mut Transaction, + tx: &Transaction, paths: &[PathBuf], ds_root: &Path, message: &str, diff --git a/src/main.rs b/src/main.rs index 193a68a..5ff4742 100644 --- a/src/main.rs +++ b/src/main.rs @@ -58,9 +58,10 @@ fn init_schema(conn: &mut Connection) -> Uuid { conn, "INIT", "Initialize dataset", - |prog, _tx| { + |prog| { let _ = prog.perform_task(&[], |task| { log::debug!("INIT task UUID is {}", task.key); + Ok::<(), ()>(()) }); let okres: Result<(), ()> = Ok(()); okres @@ -75,13 +76,13 @@ fn init_schema(conn: &mut Connection) -> Uuid { } fn do_record(conn: &mut Connection, message: &str, paths: &[PathBuf], dataset_path: &Path) -> () { - if let Err(e) = nancy::program::with_program(conn, "RECORD", message, |prog, tx| { - prog.perform_task(&[], |_task| { - // Note that this may fail, in which case we should roll back only this program - // but keep the dataset initialized. - nancy::fs::record(tx, paths, &dataset_path, message) - }) - }) { + if let Err(e) = nancy::program::with_program(conn, "RECORD", message, |prog| { + prog.perform_task(&[], |_task| { + // Note that this may fail, in which case we should roll back only this program + // but keep the dataset initialized. + nancy::fs::record(prog.transaction, paths, &dataset_path, message) + }) + }) { log::error!("Encountered error in RECORD program: {:?}", e); process::exit(1); }; diff --git a/src/program.rs b/src/program.rs index d5a0099..81f14a8 100644 --- a/src/program.rs +++ b/src/program.rs @@ -11,8 +11,6 @@ use crate::timing; #[derive(Debug)] pub struct Task { pub key: Uuid, - pub start: Instant, - pub end: Instant, } #[derive(Debug)] @@ -20,78 +18,37 @@ pub struct TaskInput { pub task: Task, //datum: data::Datum, } - -#[derive(Debug)] -pub struct Program { - pub key: Uuid, - pub name: String, - pub message: String, - pub environment: Environment, -} -impl Program { - pub fn new(tx: &mut Transaction, name: &str, message: &str) -> Result { +impl Task { + pub fn new(program: &Program) -> Result { let key = Uuid::new_v4(); - log::debug!("New {} Program with UUID {}", name, key); + log::debug!("New Task with UUID {}", key); - let env = Environment::current(None); - env.record(tx)?; - log::debug!("Environment: {:#?}", env); - - /* - start_time REAL, - end_time REAL, - - environment BLOB NOT NULL REFERENCES environment ON UPDATE CASCADE, - message TEXT NOT NULL -- user-defined message to help distinguish similar runs - */ - tx.execute( - "INSERT INTO program VALUES (?1, ?2, ?3, ?4, NULL, NULL)", + // NOTE: we currently do not save the "function" column + program.transaction.execute( + "INSERT INTO task VALUES (?1, ?2, NULL)", ( key.as_bytes(), - env.key.as_bytes(), - name, - message, + program.key.as_bytes(), ), )?; - Ok(Program { + Ok(Task { key: key, - name: name.to_string(), - message: message.to_string(), - environment: env, }) } - pub fn perform_task(self: &Program, inputs: &[TaskInput], f: F) -> R - where - F: FnOnce(&Task) -> R, - { - let start = Instant::now(); - let u = Uuid::new_v4(); - let mut task = Task { - key: u, - start: start, - end: start, - }; - let res = f(&task); - task.end = Instant::now(); - res - } - pub fn record_timestamps(self: &Self, tx: &mut Transaction, start_stamp: f64, end_stamp: f64) -> - Result<(), RSError> { - tx.execute("UPDATE program SET start_time = ?1, end_time = ?2 WHERE key = ?3", - (start_stamp, end_stamp, self.key.as_bytes()), - )?; - Ok(()) - } } #[derive(Debug)] pub enum ProgramError { CreateTransactionFailed(RSError), - NewProgramError(RSError), + NewProgramFailed(RSError), ProgramFailed(E), + RecordEnvFailed(RSError), + InsertProgramFailed(RSError), RecordTimestampsFailed(RSError), + NewTaskFailed(RSError), CommitFailed(RSError), + PerformedTaskWhileNotRunning, } impl From for ProgramError { fn from(e: E) -> ProgramError { @@ -99,8 +56,126 @@ impl From for ProgramError { } } +#[derive(Clone,Debug,PartialEq)] +enum ProgramState { + Initialized, + Running, + Finished, + Error, +} + +#[derive(Debug)] +pub struct Program<'conn> { + pub key: Uuid, + pub name: String, + pub message: String, + pub environment: Environment, + pub transaction: &'conn Transaction<'conn>, + start_systemtime: SystemTime, + start_instant: Instant, + start_stamp: f64, + state: ProgramState, +} +impl<'conn> Program<'conn> { + pub fn new(tx: &'conn Transaction, name: &str, message: &str) -> Result, ProgramError> { + let log_target = &format!("nancy.program ({})", name); + + // start transaction + // start timer + log::debug!(target: log_target, "Starting timers"); + let start_st = SystemTime::now(); + let start = Instant::now(); + let start_stamp = timing::persistent_stamp(start, start, start_st); + + log::debug!( + target: log_target, + "Start time: {:?} f64 timestamp={:#?}", + start_st, + start_stamp + ); + + let key = Uuid::new_v4(); + log::debug!("New {} Program with UUID {}", name, key); + + let mut prog = Program { + key: key, + name: name.to_string(), + message: message.to_string(), + environment: Environment::current(None), + transaction: tx, + start_systemtime: start_st, + start_instant: start, + start_stamp: start_stamp, + state: ProgramState::Initialized, + }; + + prog.environment.record(prog.transaction) + .map_err(|e| ProgramError::RecordEnvFailed(e))?; + log::debug!("Environment: {:#?}", prog.environment); + + prog.transaction.execute( + "INSERT INTO program VALUES (?1, ?2, ?3, ?4, NULL, NULL)", + ( + key.as_bytes(), + prog.environment.key.as_bytes(), + name, + message, + ), + ).map_err(|e| ProgramError::InsertProgramFailed(e))?; + + Ok(prog) + } + pub fn perform_task(self: &mut Self, inputs: &[TaskInput], f: F) -> Result> + where + F: FnOnce(&mut Task) -> Result, + { + if self.state != ProgramState::Running { + log::error!("Performed task in state={:?}", self.state); + return Err(ProgramError::PerformedTaskWhileNotRunning); + } + let mut task = Task::new(self).map_err(|e| ProgramError::NewTaskFailed(e))?; + f(&mut task).map_err(|e| ProgramError::ProgramFailed(e)) + } + pub fn record_timestamps(self: &mut Self) -> Result<(), ProgramError> { + let log_target = &format!("nancy.program ({})", self.name); + + // record end time + let end = Instant::now(); + log::debug!( + target: log_target, + "Elapsed: {} seconds", + (end - self.start_instant).as_secs_f64() + ); + let end_stamp = timing::persistent_stamp( + end, + self.start_instant, + self.start_systemtime, + ); + + self.transaction.execute("UPDATE program SET end_time = ?1 WHERE key = ?2", + (end_stamp, self.key.as_bytes()), + ).map_err(|e| ProgramError::RecordTimestampsFailed(e))?; + + Ok(()) + } + pub fn finish(self: &mut Self) -> Result<(), ProgramError> { + let log_target = &format!("nancy.program ({})", self.name); + + self.record_timestamps()?; + Ok(()) + } +} +impl<'conn> Drop for Program<'conn> { + /// Checks that the program has been cleaned up by calling the .finish() (which might) + fn drop(self: &mut Self) { + if self.state != ProgramState::Finished { + log::error!("Program reached destructor with unfinished state={:?}", self.state); + } + } +} + /// Run a closure as a program, within a database transaction -pub fn with_program Result<(), E>>( +pub fn with_program Result<(), E>>( conn: &mut Connection, name: &str, message: &str, @@ -119,43 +194,28 @@ pub fn with_program Result<(), E>>( message ); - // start transaction - let mut tx = conn.transaction() - .map_err(|e| { ProgramError::CreateTransactionFailed(e) })?; - // start timer - log::debug!(target: log_target, "Starting timers"); - let start_st = SystemTime::now(); - let start = Instant::now(); - let start_stamp = timing::persistent_stamp(start, start, start_st); + let tx = conn.transaction() + .map_err(|e| { ProgramError::CreateTransactionFailed(e) })?; - log::debug!( - target: log_target, - "Start time: {:?} f64 timestamp={:#?}", - start_st, - start_stamp - ); - - // Instantiate Program - // (record name and message for new program, get program ID) - let prog = Program::new(&mut tx, name, message).map_err(|e| { ProgramError::NewProgramError(e) })?; + let mut prog = Program::new(&tx, name, message)?; log::info!(target: log_target, "Running {:?}", prog); + prog.state = ProgramState::Running; // run closure with program argument - f(&prog, &mut tx)?; // if closure fails, transaction will be rolled back + f(&mut prog)?; // if closure fails, transaction will be rolled back + + prog.record_timestamps()?; - // end timer - // Commit scope for RECORD program - let end = Instant::now(); - log::debug!( - target: log_target, - "Elapsed: {} seconds", - (end - start).as_secs_f64() - ); - // record end time - let end_stamp = timing::persistent_stamp(end, start, start_st); - prog.record_timestamps(&mut tx, start_stamp, end_stamp).map_err(|e| {ProgramError::RecordTimestampsFailed(e) })?; // commit transaction log::debug!(target: log_target, "Committing transaction and finalizing program"); - tx.commit().map_err(|e| { ProgramError::CommitFailed(e) }) + + prog.state = ProgramState::Finished; + log::debug!(target: log_target, "Set prog.state to Finished"); + drop(prog); // stop borrowing tx, since tx.commit() will consume tx + log::debug!(target: log_target, "Dropped prog"); + + tx.commit().map_err(|e| { ProgramError::CommitFailed(e) })?; + + Ok(()) }