Merge branch 'master' of git.jhink.org:jacob/nancyrs

This commit is contained in:
Jacob Hinkle 2022-11-14 12:41:05 -05:00
commit e39dddef5c
4 changed files with 166 additions and 107 deletions

View File

@ -12,7 +12,7 @@ use std::io::{Error as IOError, Read};
/// Some simple namespaces to use for deriving UUID v5 keys
const NAMESPACE_MACHINE: Uuid = Uuid::from_bytes([1; 16]);
#[derive(Debug)]
#[derive(Clone,Debug)]
pub struct Machine {
pub key: Uuid,
pub machine_id: String,
@ -35,7 +35,7 @@ impl Machine {
os_type: os_type,
}
}
pub fn record(self: &Self, tx: &mut Transaction) -> RSResult<()> {
pub fn record(self: &Self, tx: &Transaction) -> RSResult<()> {
tx.execute(
"INSERT OR IGNORE INTO machine VALUES (?1, ?2, ?3, ?4)",
(
@ -49,7 +49,7 @@ impl Machine {
}
}
#[derive(Debug)]
#[derive(Clone,Debug)]
pub struct User {
pub key: Uuid,
pub machine: Machine,
@ -72,7 +72,7 @@ impl User {
realname: realname,
}
}
pub fn record(self: &Self, tx: &mut Transaction) -> RSResult<()> {
pub fn record(self: &Self, tx: &Transaction) -> RSResult<()> {
self.machine.record(tx);
tx.execute(
"INSERT OR IGNORE INTO user VALUES (?1, ?2, ?3, ?4)",
@ -87,7 +87,7 @@ impl User {
}
}
#[derive(Debug)]
#[derive(Clone,Debug)]
pub struct Environment {
pub key: Uuid,
pub user: User,
@ -127,9 +127,7 @@ impl Environment {
additional_info: addl_info,
}
}
//fn load(tx: &Transaction, key: Uuid) -> Self {
//}
pub fn record(self: &Self, tx: &mut Transaction) -> RSResult<()> {
pub fn record(self: &Self, tx: &Transaction) -> RSResult<()> {
self.user.record(tx);
tx.execute(
"INSERT OR IGNORE INTO environment VALUES (?1, ?2, ?3, ?4, NULL, ?5)",

View File

@ -146,7 +146,7 @@ fn buffered_hash256<'a, R: Read>(mut reader: R) -> IOResult<(Hash256, usize)> {
/// Walk the directory while sorting by filename, maintaining a stack of hashes (keys) for
/// directories and insert entries into current_files table as we pass over them. This means
/// computing the sha256 key, storing it along with parent, filetype, and symlink_target.
fn walk_and_insert(tx: &mut Transaction, p: &Path, path_key: Hash256) -> Result<(), RecordError> {
fn walk_and_insert(tx: &Transaction, p: &Path, path_key: Hash256) -> Result<(), RecordError> {
let mut insert_stmt = tx.prepare(
"INSERT OR IGNORE INTO current_files
(sha256, name, filepath, parent, version_uuid, filetype, symlink_target, recorded_time)
@ -234,7 +234,7 @@ fn walk_and_insert(tx: &mut Transaction, p: &Path, path_key: Hash256) -> Result<
/// Compute content hashes for all files in the current_files temp table. Symlinks and "Other"
/// types will have NULL hashes, and directory hashes will be computed in a separate step.
fn insert_file_content_hashes(tx: &mut Transaction) -> Result<(), RecordError> {
fn insert_file_content_hashes(tx: &Transaction) -> Result<(), RecordError> {
log::trace!("insert_file_content_hashes");
// Extract all regular files (key and path) from current_files, in arbitrary order, and compute
// content hashes for each, then update the corresponding entry in current_files.
@ -285,7 +285,7 @@ fn insert_file_content_hashes(tx: &mut Transaction) -> Result<(), RecordError> {
}
pub fn record(
tx: &mut Transaction,
tx: &Transaction,
paths: &[PathBuf],
ds_root: &Path,
message: &str,

View File

@ -58,9 +58,10 @@ fn init_schema(conn: &mut Connection) -> Uuid {
conn,
"INIT",
"Initialize dataset",
|prog, _tx| {
|prog| {
let _ = prog.perform_task(&[], |task| {
log::debug!("INIT task UUID is {}", task.key);
Ok::<(), ()>(())
});
let okres: Result<(), ()> = Ok(());
okres
@ -75,13 +76,13 @@ fn init_schema(conn: &mut Connection) -> Uuid {
}
fn do_record(conn: &mut Connection, message: &str, paths: &[PathBuf], dataset_path: &Path) -> () {
if let Err(e) = nancy::program::with_program(conn, "RECORD", message, |prog, tx| {
prog.perform_task(&[], |_task| {
// Note that this may fail, in which case we should roll back only this program
// but keep the dataset initialized.
nancy::fs::record(tx, paths, &dataset_path, message)
})
}) {
if let Err(e) = nancy::program::with_program(conn, "RECORD", message, |prog| {
prog.perform_task(&[], |_task| {
// Note that this may fail, in which case we should roll back only this program
// but keep the dataset initialized.
nancy::fs::record(prog.transaction, paths, &dataset_path, message)
})
}) {
log::error!("Encountered error in RECORD program: {:?}", e);
process::exit(1);
};

View File

@ -11,8 +11,6 @@ use crate::timing;
#[derive(Debug)]
pub struct Task {
pub key: Uuid,
pub start: Instant,
pub end: Instant,
}
#[derive(Debug)]
@ -20,78 +18,37 @@ pub struct TaskInput {
pub task: Task,
//datum: data::Datum,
}
#[derive(Debug)]
pub struct Program {
pub key: Uuid,
pub name: String,
pub message: String,
pub environment: Environment,
}
impl Program {
pub fn new(tx: &mut Transaction, name: &str, message: &str) -> Result<Program, RSError> {
impl Task {
pub fn new(program: &Program) -> Result<Task, RSError> {
let key = Uuid::new_v4();
log::debug!("New {} Program with UUID {}", name, key);
log::debug!("New Task with UUID {}", key);
let env = Environment::current(None);
env.record(tx)?;
log::debug!("Environment: {:#?}", env);
/*
start_time REAL,
end_time REAL,
environment BLOB NOT NULL REFERENCES environment ON UPDATE CASCADE,
message TEXT NOT NULL -- user-defined message to help distinguish similar runs
*/
tx.execute(
"INSERT INTO program VALUES (?1, ?2, ?3, ?4, NULL, NULL)",
// NOTE: we currently do not save the "function" column
program.transaction.execute(
"INSERT INTO task VALUES (?1, ?2, NULL)",
(
key.as_bytes(),
env.key.as_bytes(),
name,
message,
program.key.as_bytes(),
),
)?;
Ok(Program {
Ok(Task {
key: key,
name: name.to_string(),
message: message.to_string(),
environment: env,
})
}
pub fn perform_task<R, F>(self: &Program, inputs: &[TaskInput], f: F) -> R
where
F: FnOnce(&Task) -> R,
{
let start = Instant::now();
let u = Uuid::new_v4();
let mut task = Task {
key: u,
start: start,
end: start,
};
let res = f(&task);
task.end = Instant::now();
res
}
pub fn record_timestamps(self: &Self, tx: &mut Transaction, start_stamp: f64, end_stamp: f64) ->
Result<(), RSError> {
tx.execute("UPDATE program SET start_time = ?1, end_time = ?2 WHERE key = ?3",
(start_stamp, end_stamp, self.key.as_bytes()),
)?;
Ok(())
}
}
#[derive(Debug)]
pub enum ProgramError<E> {
CreateTransactionFailed(RSError),
NewProgramError(RSError),
NewProgramFailed(RSError),
ProgramFailed(E),
RecordEnvFailed(RSError),
InsertProgramFailed(RSError),
RecordTimestampsFailed(RSError),
NewTaskFailed(RSError),
CommitFailed(RSError),
PerformedTaskWhileNotRunning,
}
impl<E> From<E> for ProgramError<E> {
fn from(e: E) -> ProgramError<E> {
@ -99,8 +56,126 @@ impl<E> From<E> for ProgramError<E> {
}
}
#[derive(Clone,Debug,PartialEq)]
enum ProgramState {
Initialized,
Running,
Finished,
Error,
}
#[derive(Debug)]
pub struct Program<'conn> {
pub key: Uuid,
pub name: String,
pub message: String,
pub environment: Environment,
pub transaction: &'conn Transaction<'conn>,
start_systemtime: SystemTime,
start_instant: Instant,
start_stamp: f64,
state: ProgramState,
}
impl<'conn> Program<'conn> {
pub fn new<E>(tx: &'conn Transaction, name: &str, message: &str) -> Result<Program<'conn>, ProgramError<E>> {
let log_target = &format!("nancy.program ({})", name);
// start transaction
// start timer
log::debug!(target: log_target, "Starting timers");
let start_st = SystemTime::now();
let start = Instant::now();
let start_stamp = timing::persistent_stamp(start, start, start_st);
log::debug!(
target: log_target,
"Start time: {:?} f64 timestamp={:#?}",
start_st,
start_stamp
);
let key = Uuid::new_v4();
log::debug!("New {} Program with UUID {}", name, key);
let mut prog = Program {
key: key,
name: name.to_string(),
message: message.to_string(),
environment: Environment::current(None),
transaction: tx,
start_systemtime: start_st,
start_instant: start,
start_stamp: start_stamp,
state: ProgramState::Initialized,
};
prog.environment.record(prog.transaction)
.map_err(|e| ProgramError::RecordEnvFailed(e))?;
log::debug!("Environment: {:#?}", prog.environment);
prog.transaction.execute(
"INSERT INTO program VALUES (?1, ?2, ?3, ?4, NULL, NULL)",
(
key.as_bytes(),
prog.environment.key.as_bytes(),
name,
message,
),
).map_err(|e| ProgramError::InsertProgramFailed(e))?;
Ok(prog)
}
pub fn perform_task<E, R, F>(self: &mut Self, inputs: &[TaskInput], f: F) -> Result<R, ProgramError<E>>
where
F: FnOnce(&mut Task) -> Result<R, E>,
{
if self.state != ProgramState::Running {
log::error!("Performed task in state={:?}", self.state);
return Err(ProgramError::PerformedTaskWhileNotRunning);
}
let mut task = Task::new(self).map_err(|e| ProgramError::NewTaskFailed(e))?;
f(&mut task).map_err(|e| ProgramError::ProgramFailed(e))
}
pub fn record_timestamps<E>(self: &mut Self) -> Result<(), ProgramError<E>> {
let log_target = &format!("nancy.program ({})", self.name);
// record end time
let end = Instant::now();
log::debug!(
target: log_target,
"Elapsed: {} seconds",
(end - self.start_instant).as_secs_f64()
);
let end_stamp = timing::persistent_stamp(
end,
self.start_instant,
self.start_systemtime,
);
self.transaction.execute("UPDATE program SET end_time = ?1 WHERE key = ?2",
(end_stamp, self.key.as_bytes()),
).map_err(|e| ProgramError::RecordTimestampsFailed(e))?;
Ok(())
}
pub fn finish<E>(self: &mut Self) -> Result<(), ProgramError<E>> {
let log_target = &format!("nancy.program ({})", self.name);
self.record_timestamps()?;
Ok(())
}
}
impl<'conn> Drop for Program<'conn> {
/// Checks that the program has been cleaned up by calling the .finish() (which might)
fn drop(self: &mut Self) {
if self.state != ProgramState::Finished {
log::error!("Program reached destructor with unfinished state={:?}", self.state);
}
}
}
/// Run a closure as a program, within a database transaction
pub fn with_program<E, F: FnOnce(&Program, &mut Transaction) -> Result<(), E>>(
pub fn with_program<E, F: FnOnce(&mut Program) -> Result<(), E>>(
conn: &mut Connection,
name: &str,
message: &str,
@ -119,43 +194,28 @@ pub fn with_program<E, F: FnOnce(&Program, &mut Transaction) -> Result<(), E>>(
message
);
// start transaction
let mut tx = conn.transaction()
.map_err(|e| { ProgramError::CreateTransactionFailed(e) })?;
// start timer
log::debug!(target: log_target, "Starting timers");
let start_st = SystemTime::now();
let start = Instant::now();
let start_stamp = timing::persistent_stamp(start, start, start_st);
let tx = conn.transaction()
.map_err(|e| { ProgramError::CreateTransactionFailed(e) })?;
log::debug!(
target: log_target,
"Start time: {:?} f64 timestamp={:#?}",
start_st,
start_stamp
);
// Instantiate Program
// (record name and message for new program, get program ID)
let prog = Program::new(&mut tx, name, message).map_err(|e| { ProgramError::NewProgramError(e) })?;
let mut prog = Program::new(&tx, name, message)?;
log::info!(target: log_target, "Running {:?}", prog);
prog.state = ProgramState::Running;
// run closure with program argument
f(&prog, &mut tx)?; // if closure fails, transaction will be rolled back
f(&mut prog)?; // if closure fails, transaction will be rolled back
prog.record_timestamps()?;
// end timer
// Commit scope for RECORD program
let end = Instant::now();
log::debug!(
target: log_target,
"Elapsed: {} seconds",
(end - start).as_secs_f64()
);
// record end time
let end_stamp = timing::persistent_stamp(end, start, start_st);
prog.record_timestamps(&mut tx, start_stamp, end_stamp).map_err(|e| {ProgramError::RecordTimestampsFailed(e) })?;
// commit transaction
log::debug!(target: log_target, "Committing transaction and finalizing program");
tx.commit().map_err(|e| { ProgramError::CommitFailed(e) })
prog.state = ProgramState::Finished;
log::debug!(target: log_target, "Set prog.state to Finished");
drop(prog); // stop borrowing tx, since tx.commit() will consume tx
log::debug!(target: log_target, "Dropped prog");
tx.commit().map_err(|e| { ProgramError::CommitFailed(e) })?;
Ok(())
}