Compare commits
2 Commits
93164baf85
...
2006453617
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2006453617 | ||
|
|
e39dddef5c |
@ -12,7 +12,7 @@ use std::io::{Error as IOError, Read};
|
|||||||
/// Some simple namespaces to use for deriving UUID v5 keys
|
/// Some simple namespaces to use for deriving UUID v5 keys
|
||||||
const NAMESPACE_MACHINE: Uuid = Uuid::from_bytes([1; 16]);
|
const NAMESPACE_MACHINE: Uuid = Uuid::from_bytes([1; 16]);
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Clone,Debug)]
|
||||||
pub struct Machine {
|
pub struct Machine {
|
||||||
pub key: Uuid,
|
pub key: Uuid,
|
||||||
pub machine_id: String,
|
pub machine_id: String,
|
||||||
@ -35,7 +35,7 @@ impl Machine {
|
|||||||
os_type: os_type,
|
os_type: os_type,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn record(self: &Self, tx: &mut Transaction) -> RSResult<()> {
|
pub fn record(self: &Self, tx: &Transaction) -> RSResult<()> {
|
||||||
tx.execute(
|
tx.execute(
|
||||||
"INSERT OR IGNORE INTO machine VALUES (?1, ?2, ?3, ?4)",
|
"INSERT OR IGNORE INTO machine VALUES (?1, ?2, ?3, ?4)",
|
||||||
(
|
(
|
||||||
@ -49,7 +49,7 @@ impl Machine {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Clone,Debug)]
|
||||||
pub struct User {
|
pub struct User {
|
||||||
pub key: Uuid,
|
pub key: Uuid,
|
||||||
pub machine: Machine,
|
pub machine: Machine,
|
||||||
@ -72,7 +72,7 @@ impl User {
|
|||||||
realname: realname,
|
realname: realname,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn record(self: &Self, tx: &mut Transaction) -> RSResult<()> {
|
pub fn record(self: &Self, tx: &Transaction) -> RSResult<()> {
|
||||||
self.machine.record(tx);
|
self.machine.record(tx);
|
||||||
tx.execute(
|
tx.execute(
|
||||||
"INSERT OR IGNORE INTO user VALUES (?1, ?2, ?3, ?4)",
|
"INSERT OR IGNORE INTO user VALUES (?1, ?2, ?3, ?4)",
|
||||||
@ -87,7 +87,7 @@ impl User {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Clone,Debug)]
|
||||||
pub struct Environment {
|
pub struct Environment {
|
||||||
pub key: Uuid,
|
pub key: Uuid,
|
||||||
pub user: User,
|
pub user: User,
|
||||||
@ -127,9 +127,7 @@ impl Environment {
|
|||||||
additional_info: addl_info,
|
additional_info: addl_info,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//fn load(tx: &Transaction, key: Uuid) -> Self {
|
pub fn record(self: &Self, tx: &Transaction) -> RSResult<()> {
|
||||||
//}
|
|
||||||
pub fn record(self: &Self, tx: &mut Transaction) -> RSResult<()> {
|
|
||||||
self.user.record(tx);
|
self.user.record(tx);
|
||||||
tx.execute(
|
tx.execute(
|
||||||
"INSERT OR IGNORE INTO environment VALUES (?1, ?2, ?3, ?4, NULL, ?5)",
|
"INSERT OR IGNORE INTO environment VALUES (?1, ?2, ?3, ?4, NULL, ?5)",
|
||||||
|
|||||||
@ -146,7 +146,7 @@ fn buffered_hash256<'a, R: Read>(mut reader: R) -> IOResult<(Hash256, usize)> {
|
|||||||
/// Walk the directory while sorting by filename, maintaining a stack of hashes (keys) for
|
/// Walk the directory while sorting by filename, maintaining a stack of hashes (keys) for
|
||||||
/// directories and insert entries into current_files table as we pass over them. This means
|
/// directories and insert entries into current_files table as we pass over them. This means
|
||||||
/// computing the sha256 key, storing it along with parent, filetype, and symlink_target.
|
/// computing the sha256 key, storing it along with parent, filetype, and symlink_target.
|
||||||
fn walk_and_insert(tx: &mut Transaction, p: &Path, path_key: Hash256) -> Result<(), RecordError> {
|
fn walk_and_insert(tx: &Transaction, p: &Path, path_key: Hash256) -> Result<(), RecordError> {
|
||||||
let mut insert_stmt = tx.prepare(
|
let mut insert_stmt = tx.prepare(
|
||||||
"INSERT OR IGNORE INTO current_files
|
"INSERT OR IGNORE INTO current_files
|
||||||
(sha256, name, filepath, parent, version_uuid, filetype, symlink_target, recorded_time)
|
(sha256, name, filepath, parent, version_uuid, filetype, symlink_target, recorded_time)
|
||||||
@ -234,7 +234,7 @@ fn walk_and_insert(tx: &mut Transaction, p: &Path, path_key: Hash256) -> Result<
|
|||||||
|
|
||||||
/// Compute content hashes for all files in the current_files temp table. Symlinks and "Other"
|
/// Compute content hashes for all files in the current_files temp table. Symlinks and "Other"
|
||||||
/// types will have NULL hashes, and directory hashes will be computed in a separate step.
|
/// types will have NULL hashes, and directory hashes will be computed in a separate step.
|
||||||
fn insert_file_content_hashes(tx: &mut Transaction) -> Result<(), RecordError> {
|
fn insert_file_content_hashes(tx: &Transaction) -> Result<(), RecordError> {
|
||||||
log::trace!("insert_file_content_hashes");
|
log::trace!("insert_file_content_hashes");
|
||||||
// Extract all regular files (key and path) from current_files, in arbitrary order, and compute
|
// Extract all regular files (key and path) from current_files, in arbitrary order, and compute
|
||||||
// content hashes for each, then update the corresponding entry in current_files.
|
// content hashes for each, then update the corresponding entry in current_files.
|
||||||
@ -285,7 +285,7 @@ fn insert_file_content_hashes(tx: &mut Transaction) -> Result<(), RecordError> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn record(
|
pub fn record(
|
||||||
tx: &mut Transaction,
|
tx: &Transaction,
|
||||||
paths: &[PathBuf],
|
paths: &[PathBuf],
|
||||||
ds_root: &Path,
|
ds_root: &Path,
|
||||||
message: &str,
|
message: &str,
|
||||||
|
|||||||
17
src/main.rs
17
src/main.rs
@ -58,9 +58,10 @@ fn init_schema(conn: &mut Connection) -> Uuid {
|
|||||||
conn,
|
conn,
|
||||||
"INIT",
|
"INIT",
|
||||||
"Initialize dataset",
|
"Initialize dataset",
|
||||||
|prog, _tx| {
|
|prog| {
|
||||||
let _ = prog.perform_task(&[], |task| {
|
let _ = prog.perform_task(&[], |task| {
|
||||||
log::debug!("INIT task UUID is {}", task.key);
|
log::debug!("INIT task UUID is {}", task.key);
|
||||||
|
Ok::<(), ()>(())
|
||||||
});
|
});
|
||||||
let okres: Result<(), ()> = Ok(());
|
let okres: Result<(), ()> = Ok(());
|
||||||
okres
|
okres
|
||||||
@ -75,13 +76,13 @@ fn init_schema(conn: &mut Connection) -> Uuid {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn do_record(conn: &mut Connection, message: &str, paths: &[PathBuf], dataset_path: &Path) -> () {
|
fn do_record(conn: &mut Connection, message: &str, paths: &[PathBuf], dataset_path: &Path) -> () {
|
||||||
if let Err(e) = nancy::program::with_program(conn, "RECORD", message, |prog, tx| {
|
if let Err(e) = nancy::program::with_program(conn, "RECORD", message, |prog| {
|
||||||
prog.perform_task(&[], |_task| {
|
prog.perform_task(&[], |_task| {
|
||||||
// Note that this may fail, in which case we should roll back only this program
|
// Note that this may fail, in which case we should roll back only this program
|
||||||
// but keep the dataset initialized.
|
// but keep the dataset initialized.
|
||||||
nancy::fs::record(tx, paths, &dataset_path, message)
|
nancy::fs::record(prog.transaction, paths, &dataset_path, message)
|
||||||
})
|
})
|
||||||
}) {
|
}) {
|
||||||
log::error!("Encountered error in RECORD program: {:?}", e);
|
log::error!("Encountered error in RECORD program: {:?}", e);
|
||||||
process::exit(1);
|
process::exit(1);
|
||||||
};
|
};
|
||||||
|
|||||||
236
src/program.rs
236
src/program.rs
@ -11,8 +11,6 @@ use crate::timing;
|
|||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Task {
|
pub struct Task {
|
||||||
pub key: Uuid,
|
pub key: Uuid,
|
||||||
pub start: Instant,
|
|
||||||
pub end: Instant,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@ -20,78 +18,37 @@ pub struct TaskInput {
|
|||||||
pub task: Task,
|
pub task: Task,
|
||||||
//datum: data::Datum,
|
//datum: data::Datum,
|
||||||
}
|
}
|
||||||
|
impl Task {
|
||||||
#[derive(Debug)]
|
pub fn new(program: &Program) -> Result<Task, RSError> {
|
||||||
pub struct Program {
|
|
||||||
pub key: Uuid,
|
|
||||||
pub name: String,
|
|
||||||
pub message: String,
|
|
||||||
pub environment: Environment,
|
|
||||||
}
|
|
||||||
impl Program {
|
|
||||||
pub fn new(tx: &mut Transaction, name: &str, message: &str) -> Result<Program, RSError> {
|
|
||||||
let key = Uuid::new_v4();
|
let key = Uuid::new_v4();
|
||||||
log::debug!("New {} Program with UUID {}", name, key);
|
log::debug!("New Task with UUID {}", key);
|
||||||
|
|
||||||
let env = Environment::current(None);
|
// NOTE: we currently do not save the "function" column
|
||||||
env.record(tx)?;
|
program.transaction.execute(
|
||||||
log::debug!("Environment: {:#?}", env);
|
"INSERT INTO task VALUES (?1, ?2, NULL)",
|
||||||
|
|
||||||
/*
|
|
||||||
start_time REAL,
|
|
||||||
end_time REAL,
|
|
||||||
|
|
||||||
environment BLOB NOT NULL REFERENCES environment ON UPDATE CASCADE,
|
|
||||||
message TEXT NOT NULL -- user-defined message to help distinguish similar runs
|
|
||||||
*/
|
|
||||||
tx.execute(
|
|
||||||
"INSERT INTO program VALUES (?1, ?2, ?3, ?4, NULL, NULL)",
|
|
||||||
(
|
(
|
||||||
key.as_bytes(),
|
key.as_bytes(),
|
||||||
env.key.as_bytes(),
|
program.key.as_bytes(),
|
||||||
name,
|
|
||||||
message,
|
|
||||||
),
|
),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
Ok(Program {
|
Ok(Task {
|
||||||
key: key,
|
key: key,
|
||||||
name: name.to_string(),
|
|
||||||
message: message.to_string(),
|
|
||||||
environment: env,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
pub fn perform_task<R, F>(self: &Program, inputs: &[TaskInput], f: F) -> R
|
|
||||||
where
|
|
||||||
F: FnOnce(&Task) -> R,
|
|
||||||
{
|
|
||||||
let start = Instant::now();
|
|
||||||
let u = Uuid::new_v4();
|
|
||||||
let mut task = Task {
|
|
||||||
key: u,
|
|
||||||
start: start,
|
|
||||||
end: start,
|
|
||||||
};
|
|
||||||
let res = f(&task);
|
|
||||||
task.end = Instant::now();
|
|
||||||
res
|
|
||||||
}
|
|
||||||
pub fn record_timestamps(self: &Self, tx: &mut Transaction, start_stamp: f64, end_stamp: f64) ->
|
|
||||||
Result<(), RSError> {
|
|
||||||
tx.execute("UPDATE program SET start_time = ?1, end_time = ?2 WHERE key = ?3",
|
|
||||||
(start_stamp, end_stamp, self.key.as_bytes()),
|
|
||||||
)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum ProgramError<E> {
|
pub enum ProgramError<E> {
|
||||||
CreateTransactionFailed(RSError),
|
CreateTransactionFailed(RSError),
|
||||||
NewProgramError(RSError),
|
NewProgramFailed(RSError),
|
||||||
ProgramFailed(E),
|
ProgramFailed(E),
|
||||||
|
RecordEnvFailed(RSError),
|
||||||
|
InsertProgramFailed(RSError),
|
||||||
RecordTimestampsFailed(RSError),
|
RecordTimestampsFailed(RSError),
|
||||||
|
NewTaskFailed(RSError),
|
||||||
CommitFailed(RSError),
|
CommitFailed(RSError),
|
||||||
|
PerformedTaskWhileNotRunning,
|
||||||
}
|
}
|
||||||
impl<E> From<E> for ProgramError<E> {
|
impl<E> From<E> for ProgramError<E> {
|
||||||
fn from(e: E) -> ProgramError<E> {
|
fn from(e: E) -> ProgramError<E> {
|
||||||
@ -99,8 +56,126 @@ impl<E> From<E> for ProgramError<E> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone,Debug,PartialEq)]
|
||||||
|
enum ProgramState {
|
||||||
|
Initialized,
|
||||||
|
Running,
|
||||||
|
Finished,
|
||||||
|
Error,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Program<'conn> {
|
||||||
|
pub key: Uuid,
|
||||||
|
pub name: String,
|
||||||
|
pub message: String,
|
||||||
|
pub environment: Environment,
|
||||||
|
pub transaction: &'conn Transaction<'conn>,
|
||||||
|
start_systemtime: SystemTime,
|
||||||
|
start_instant: Instant,
|
||||||
|
start_stamp: f64,
|
||||||
|
state: ProgramState,
|
||||||
|
}
|
||||||
|
impl<'conn> Program<'conn> {
|
||||||
|
pub fn new<E>(tx: &'conn Transaction, name: &str, message: &str) -> Result<Program<'conn>, ProgramError<E>> {
|
||||||
|
let log_target = &format!("nancy.program ({})", name);
|
||||||
|
|
||||||
|
// start transaction
|
||||||
|
// start timer
|
||||||
|
log::debug!(target: log_target, "Starting timers");
|
||||||
|
let start_st = SystemTime::now();
|
||||||
|
let start = Instant::now();
|
||||||
|
let start_stamp = timing::persistent_stamp(start, start, start_st);
|
||||||
|
|
||||||
|
log::debug!(
|
||||||
|
target: log_target,
|
||||||
|
"Start time: {:?} f64 timestamp={:#?}",
|
||||||
|
start_st,
|
||||||
|
start_stamp
|
||||||
|
);
|
||||||
|
|
||||||
|
let key = Uuid::new_v4();
|
||||||
|
log::debug!("New {} Program with UUID {}", name, key);
|
||||||
|
|
||||||
|
let mut prog = Program {
|
||||||
|
key: key,
|
||||||
|
name: name.to_string(),
|
||||||
|
message: message.to_string(),
|
||||||
|
environment: Environment::current(None),
|
||||||
|
transaction: tx,
|
||||||
|
start_systemtime: start_st,
|
||||||
|
start_instant: start,
|
||||||
|
start_stamp: start_stamp,
|
||||||
|
state: ProgramState::Initialized,
|
||||||
|
};
|
||||||
|
|
||||||
|
prog.environment.record(prog.transaction)
|
||||||
|
.map_err(|e| ProgramError::RecordEnvFailed(e))?;
|
||||||
|
log::debug!("Environment: {:#?}", prog.environment);
|
||||||
|
|
||||||
|
prog.transaction.execute(
|
||||||
|
"INSERT INTO program VALUES (?1, ?2, ?3, ?4, NULL, NULL)",
|
||||||
|
(
|
||||||
|
key.as_bytes(),
|
||||||
|
prog.environment.key.as_bytes(),
|
||||||
|
name,
|
||||||
|
message,
|
||||||
|
),
|
||||||
|
).map_err(|e| ProgramError::InsertProgramFailed(e))?;
|
||||||
|
|
||||||
|
Ok(prog)
|
||||||
|
}
|
||||||
|
pub fn perform_task<E, R, F>(self: &mut Self, inputs: &[TaskInput], f: F) -> Result<R, ProgramError<E>>
|
||||||
|
where
|
||||||
|
F: FnOnce(&mut Task) -> Result<R, E>,
|
||||||
|
{
|
||||||
|
if self.state != ProgramState::Running {
|
||||||
|
log::error!("Performed task in state={:?}", self.state);
|
||||||
|
return Err(ProgramError::PerformedTaskWhileNotRunning);
|
||||||
|
}
|
||||||
|
let mut task = Task::new(self).map_err(|e| ProgramError::NewTaskFailed(e))?;
|
||||||
|
f(&mut task).map_err(|e| ProgramError::ProgramFailed(e))
|
||||||
|
}
|
||||||
|
pub fn record_timestamps<E>(self: &mut Self) -> Result<(), ProgramError<E>> {
|
||||||
|
let log_target = &format!("nancy.program ({})", self.name);
|
||||||
|
|
||||||
|
// record end time
|
||||||
|
let end = Instant::now();
|
||||||
|
log::debug!(
|
||||||
|
target: log_target,
|
||||||
|
"Elapsed: {} seconds",
|
||||||
|
(end - self.start_instant).as_secs_f64()
|
||||||
|
);
|
||||||
|
let end_stamp = timing::persistent_stamp(
|
||||||
|
end,
|
||||||
|
self.start_instant,
|
||||||
|
self.start_systemtime,
|
||||||
|
);
|
||||||
|
|
||||||
|
self.transaction.execute("UPDATE program SET end_time = ?1 WHERE key = ?2",
|
||||||
|
(end_stamp, self.key.as_bytes()),
|
||||||
|
).map_err(|e| ProgramError::RecordTimestampsFailed(e))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
pub fn finish<E>(self: &mut Self) -> Result<(), ProgramError<E>> {
|
||||||
|
let log_target = &format!("nancy.program ({})", self.name);
|
||||||
|
|
||||||
|
self.record_timestamps()?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<'conn> Drop for Program<'conn> {
|
||||||
|
/// Checks that the program has been cleaned up by calling the .finish() (which might)
|
||||||
|
fn drop(self: &mut Self) {
|
||||||
|
if self.state != ProgramState::Finished {
|
||||||
|
log::error!("Program reached destructor with unfinished state={:?}", self.state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Run a closure as a program, within a database transaction
|
/// Run a closure as a program, within a database transaction
|
||||||
pub fn with_program<E, F: FnOnce(&Program, &mut Transaction) -> Result<(), E>>(
|
pub fn with_program<E, F: FnOnce(&mut Program) -> Result<(), E>>(
|
||||||
conn: &mut Connection,
|
conn: &mut Connection,
|
||||||
name: &str,
|
name: &str,
|
||||||
message: &str,
|
message: &str,
|
||||||
@ -119,43 +194,28 @@ pub fn with_program<E, F: FnOnce(&Program, &mut Transaction) -> Result<(), E>>(
|
|||||||
message
|
message
|
||||||
);
|
);
|
||||||
|
|
||||||
// start transaction
|
let tx = conn.transaction()
|
||||||
let mut tx = conn.transaction()
|
.map_err(|e| { ProgramError::CreateTransactionFailed(e) })?;
|
||||||
.map_err(|e| { ProgramError::CreateTransactionFailed(e) })?;
|
|
||||||
// start timer
|
|
||||||
log::debug!(target: log_target, "Starting timers");
|
|
||||||
let start_st = SystemTime::now();
|
|
||||||
let start = Instant::now();
|
|
||||||
let start_stamp = timing::persistent_stamp(start, start, start_st);
|
|
||||||
|
|
||||||
log::debug!(
|
let mut prog = Program::new(&tx, name, message)?;
|
||||||
target: log_target,
|
|
||||||
"Start time: {:?} f64 timestamp={:#?}",
|
|
||||||
start_st,
|
|
||||||
start_stamp
|
|
||||||
);
|
|
||||||
|
|
||||||
// Instantiate Program
|
|
||||||
// (record name and message for new program, get program ID)
|
|
||||||
let prog = Program::new(&mut tx, name, message).map_err(|e| { ProgramError::NewProgramError(e) })?;
|
|
||||||
|
|
||||||
log::info!(target: log_target, "Running {:?}", prog);
|
log::info!(target: log_target, "Running {:?}", prog);
|
||||||
|
prog.state = ProgramState::Running;
|
||||||
|
|
||||||
// run closure with program argument
|
// run closure with program argument
|
||||||
f(&prog, &mut tx)?; // if closure fails, transaction will be rolled back
|
f(&mut prog)?; // if closure fails, transaction will be rolled back
|
||||||
|
|
||||||
|
prog.record_timestamps()?;
|
||||||
|
|
||||||
// end timer
|
|
||||||
// Commit scope for RECORD program
|
|
||||||
let end = Instant::now();
|
|
||||||
log::debug!(
|
|
||||||
target: log_target,
|
|
||||||
"Elapsed: {} seconds",
|
|
||||||
(end - start).as_secs_f64()
|
|
||||||
);
|
|
||||||
// record end time
|
|
||||||
let end_stamp = timing::persistent_stamp(end, start, start_st);
|
|
||||||
prog.record_timestamps(&mut tx, start_stamp, end_stamp).map_err(|e| {ProgramError::RecordTimestampsFailed(e) })?;
|
|
||||||
// commit transaction
|
// commit transaction
|
||||||
log::debug!(target: log_target, "Committing transaction and finalizing program");
|
log::debug!(target: log_target, "Committing transaction and finalizing program");
|
||||||
tx.commit().map_err(|e| { ProgramError::CommitFailed(e) })
|
|
||||||
|
prog.state = ProgramState::Finished;
|
||||||
|
log::debug!(target: log_target, "Set prog.state to Finished");
|
||||||
|
drop(prog); // stop borrowing tx, since tx.commit() will consume tx
|
||||||
|
log::debug!(target: log_target, "Dropped prog");
|
||||||
|
|
||||||
|
tx.commit().map_err(|e| { ProgramError::CommitFailed(e) })?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user