From 3526cf1f21c13b183b007482d40e7eaf39d11a38 Mon Sep 17 00:00:00 2001 From: Jacob Hinkle Date: Fri, 11 Nov 2022 09:02:42 -0500 Subject: [PATCH] Persistent machine/user/env/program save to db --- src/environment.rs | 146 +++++++++++++++++++++ src/fs.rs | 14 +- src/lib.rs | 1 + src/main.rs | 12 +- src/migrations/20221024_initial_schema.sql | 66 ++++------ src/program.rs | 132 +++++++++++-------- 6 files changed, 258 insertions(+), 113 deletions(-) create mode 100644 src/environment.rs diff --git a/src/environment.rs b/src/environment.rs new file mode 100644 index 0000000..6b4a28d --- /dev/null +++ b/src/environment.rs @@ -0,0 +1,146 @@ +use machine_uid; +use rusqlite::{Error as RSError, Result as RSResult, ToSql, Transaction, types as rstypes}; +use sys_info; +use uuid::{Uuid}; +use whoami; + +use std::collections::{HashMap}; +use std::env; +use std::fs::{File}; +use std::io::{Error as IOError, Read}; + +/// Some simple namespaces to use for deriving UUID v5 keys +const NAMESPACE_MACHINE: Uuid = Uuid::from_bytes([1; 16]); + +#[derive(Debug)] +pub struct Machine { + pub key: Uuid, + pub machine_id: String, + pub hostname: String, + pub os_type: String, +} +impl Machine { + pub fn current() -> Self { + let hostname = sys_info::hostname().unwrap_or("Unknown".to_string()); + let mid = machine_uid::get().unwrap_or("Unknown".to_string()); + let os_type = sys_info::os_type().unwrap_or("Unknown".to_string()); + let key = Uuid::new_v5( + &NAMESPACE_MACHINE, + format!("{},{},{}", hostname, mid, os_type).as_bytes(), + ); + Machine { + key: key, + machine_id: mid, + hostname: hostname, + os_type: os_type, + } + } + pub fn record(self: &Self, tx: &mut Transaction) -> RSResult<()> { + tx.execute( + "INSERT OR IGNORE INTO machine VALUES (?1, ?2, ?3, ?4)", + ( + &self.key.as_bytes(), + &self.machine_id, + &self.hostname, + &self.os_type, + ), + )?; + Ok(()) + } +} + +#[derive(Debug)] +pub struct User { + pub key: Uuid, + pub machine: Machine, + pub username: String, + pub realname: String, +} +impl User { + pub fn current() -> Self { + let machine = Machine::current(); + let username = whoami::username(); + let realname = whoami::realname(); + let key = Uuid::new_v5( + &machine.key, + format!("{}\n{}", username, realname).as_bytes(), + ); + User { + key: key, + machine: machine, + username: username, + realname: realname, + } + } + pub fn record(self: &Self, tx: &mut Transaction) -> RSResult<()> { + self.machine.record(tx); + tx.execute( + "INSERT OR IGNORE INTO user VALUES (?1, ?2, ?3, ?4)", + ( + &self.key.as_bytes(), + &self.machine.key.as_bytes(), + &self.username, + &self.realname, + ), + )?; + Ok(()) + } +} + +#[derive(Debug)] +pub struct Environment { + pub key: Uuid, + pub user: User, + pub os_release: String, + //pub timezone: String, + pub linux_os_release: String, + //pub env_vars: HashMap, + pub additional_info: Option, +} +impl Environment { + pub fn current(addl_info: Option) -> Self { + let lor_result: Result = ( + || { + let mut s = String::new(); + let mut f = File::open("/etc/os-release")?; + f.read_to_string(&mut s)?; + Ok(s) + })(); + let lor = lor_result.unwrap_or("Unknown".to_string()); + let user = User::current(); + let os_release = sys_info::os_release().unwrap_or("Unknown".to_string()); + //let env_vars = HashMap::from_iter(env::vars()); + let key = Uuid::new_v5( + &user.key, + format!( + "{}\n{}", + os_release, + lor, + ).as_bytes(), + ); + Environment { + key: key, + user: user, + os_release: os_release, + linux_os_release: lor, + //env_vars: env_vars, + additional_info: addl_info, + } + } + //fn load(tx: &Transaction, key: Uuid) -> Self { + //} + pub fn record(self: &Self, tx: &mut Transaction) -> RSResult<()> { + self.user.record(tx); + tx.execute( + "INSERT OR IGNORE INTO environment VALUES (?1, ?2, ?3, ?4, NULL, ?5)", + ( + &self.key.as_bytes(), + &self.user.key.as_bytes(), + &self.os_release, + &self.linux_os_release, + &self.additional_info, + ), + )?; + Ok(()) + } +} diff --git a/src/fs.rs b/src/fs.rs index b6ab52c..39b957b 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -174,7 +174,6 @@ fn walk_and_insert(tx: &mut Transaction, p: &Path, path_key: Hash256) -> Result< .to_str() .expect("File name will be convertable to a UTF-8 string"); - log::debug!("{:?} : finding target", path); let symlink_target = if filetype == FileType::Symlink { Some( std::fs::read_link(&path)? @@ -183,7 +182,6 @@ fn walk_and_insert(tx: &mut Transaction, p: &Path, path_key: Hash256) -> Result< .to_string() ) } else { None }; - log::debug!("{:?} : target = {:?}", path, symlink_target); if entry.depth > depth { // descending into directory @@ -201,16 +199,6 @@ fn walk_and_insert(tx: &mut Transaction, p: &Path, path_key: Hash256) -> Result< //log::debug!("Up"); parent = dirstack.pop().unwrap(); depth -= 1; - - log::debug!( - " parent={} ed={} d={}", - &(match parent { - Some(h) => format!("{}", h), - None => "None".to_string(), - })[..8], - entry.depth, - depth, - ); } let mut ctx = Context::new(&SHA256); @@ -268,7 +256,7 @@ fn insert_file_content_hashes(tx: &mut Transaction) -> Result<(), RecordError> { let hashes: Vec> = allpaths.par_iter().map(|fpres| { if let Ok((id, filepath)) = fpres { let input = File::open(filepath)?; - let mut reader = BufReader::new(input); + let reader = BufReader::new(input); let (chash, size_bytes) = buffered_hash256(reader)?; Ok((*id, chash, size_bytes, Instant::now())) } else { // should never happen diff --git a/src/lib.rs b/src/lib.rs index a57a407..ccbb603 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,7 @@ use uuid::Uuid; //pub mod data; pub mod db; +pub mod environment; pub mod fs; pub mod program; pub mod timing; diff --git a/src/main.rs b/src/main.rs index 5d4dddd..193a68a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -54,19 +54,21 @@ fn init_schema(conn: &mut Connection) -> Uuid { log::info!("Dataset UUID is {dataset_uuid}"); // Run an empty program so that the dataset log reflects when it was // initialized - nancy::program::with_program( + if let Err(e) = nancy::program::with_program( conn, "INIT", "Initialize dataset", |prog, _tx| { let _ = prog.perform_task(&[], |task| { - log::debug!("INIT task UUID is {}", task.uuid); + log::debug!("INIT task UUID is {}", task.key); }); let okres: Result<(), ()> = Ok(()); okres }, - ) - .expect("Empty program should not throw error"); + ) { + log::error!("Empty program error: {e:?}"); + process::exit(1); + } dataset_uuid } } @@ -130,7 +132,7 @@ fn main() { let u = init_schema(&mut conn); do_record( &mut conn, - &format!("Initial record of dataset {:?}", u), + &format!("Initial recording of dataset {:?}", u), &[dataset_path.to_path_buf()], dataset_path, ); diff --git a/src/migrations/20221024_initial_schema.sql b/src/migrations/20221024_initial_schema.sql index 20144d3..f742284 100644 --- a/src/migrations/20221024_initial_schema.sql +++ b/src/migrations/20221024_initial_schema.sql @@ -30,25 +30,22 @@ CREATE TABLE triggers( -- type are included but not the kernel version. Software that changes due to -- updates should be included in the "environment" table instead. CREATE TABLE machine( - sha256 BLOB PRIMARY KEY NOT NULL, + key BLOB PRIMARY KEY NOT NULL, machine_id TEXT, -- platform-dependent unique hardware id -- Linux: open('/etc/machine-id', 'r').read() (assumes systemd) -- OSX: `ioreg -rd1 -c IOPlatformExpertDevice | grep IOPlatformUUID | awk '{$print $3}' | tr -d \"` -- Windows: `reg query HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Cryptography /v MachineGuid` - hostname TEXT, -- platform.node(): 'lucky' - system TEXT, -- platform.system(): 'Linux' - cpu_type TEXT, -- platform.machine(): 'x86_64' - processor TEXT -- platform.processor(): + hostname TEXT, -- e.g. 'lucky' + os_type TEXT -- e.g. 'Linux' ); -- Programs are run by users on machines CREATE TABLE user( - sha256 BLOB PRIMARY KEY NOT NULL, + key BLOB PRIMARY KEY NOT NULL, + machine BLOB NOT NULL REFERENCES machine ON UPDATE CASCADE, username TEXT NOT NULL, -- getpass.getuser() - userid INTEGER, -- os.getuid() - fullname TEXT, -- on Linux/OSX: pwd.getpwuid(os.getuid()).pw_gecos + realname TEXT -- on Linux/OSX: pwd.getpwuid(os.getuid()).pw_gecos -- on Windows: see https://stackoverflow.com/questions/21766954/how-to-get-windows-users-full-name-in-python - machine BLOB NOT NULL REFERENCES machine ON UPDATE CASCADE ); CREATE INDEX FK_user_machine ON user (machine); @@ -68,7 +65,7 @@ CREATE INDEX FK_user_machine ON user (machine); -- https://datatracker.ietf.org/doc/html/rfc4122.html -- This is possible in Python by simply calling uuid.uuid4() with no arguments CREATE TABLE dataset ( - uuid BLOB PRIMARY KEY NOT NULL -- UUID generated by uuid.uuid4() + key BLOB PRIMARY KEY NOT NULL -- UUID generated by uuid.uuid4() ); @@ -104,7 +101,7 @@ END; -- course dependent on filetype). Each version has a number, and was provided by -- some program (and potentially a datum). CREATE TABLE filedir_version ( - uuid BLOB PRIMARY KEY NOT NULL, + key BLOB PRIMARY KEY NOT NULL, filedir BLOB NOT NULL REFERENCES filedir ON UPDATE CASCADE, -- parent filedir entry @@ -146,27 +143,13 @@ END; END; -- python executable being used, and environment variables are recorded here. -- Other info is available in the parent "machine" table. CREATE TABLE environment ( - sha256 BLOB PRIMARY KEY NOT NULL, - - envvars_json TEXT, -- json.dumps(dict(os.environ), sort_keys=True) - python_implementation TEXT, -- platform.python_implementation(): 'cpython' - python_strversion TEXT, -- sys.version: '3.9.7 (default, Sep 16 2021, 13:09:58) \n[GCC 7.5.0]' - python_hexversion INTEGER, -- sys.hexversion: 50923504 - user TEXT REFERENCES user ON UPDATE CASCADE, + key BLOB PRIMARY KEY NOT NULL, + user BLOB NOT NULL REFERENCES user ON UPDATE CASCADE, + os_release TEXT, + linux_os_release TEXT, timezone TEXT, -- timezone, for interpreting event times - platform_release TEXT, -- platform.release(): '5.15.64' - freedesktop_os_release TEXT, -- requires python 3.10 - -- platform.freedesktop_os_release() as JSON - -- "{'NAME': 'NixOS', 'ID': 'nixos', - -- 'PRETTY_NAME': 'NixOS 22.05 (Quokka)', 'BUG_REPORT_URL': - -- 'https://github.com/NixOS/nixpkgs/issues', 'BUILD_ID': - -- '22.05.20220902.67e4507', 'DOCUMENTATION_URL': - -- 'https://nixos.org/learn.html', 'HOME_URL': 'https://nixos.org/', 'LOGO': - -- 'nix-snowflake', 'SUPPORT_URL': 'https://nixos.org/community.html', - -- 'VERSION': '22.05 (Quokka)', 'VERSION_CODENAME': 'quokka', 'VERSION_ID': - -- '22.05'}" - win32_ver TEXT, -- platform.win32_ver() as JSON - mac_ver TEXT -- platform.mac_ver() as JSON + --envvars_json TEXT + additional_info TEXT -- other env context, possibly provided by a language wrapper (e.g. python version info) ); @@ -176,18 +159,15 @@ CREATE TABLE environment ( -- programs are automatically imported and merged when possible when loading a -- "datum" from disk. CREATE TABLE program ( - uuid BLOB PRIMARY KEY NOT NULL, + key BLOB PRIMARY KEY NOT NULL, + environment BLOB NOT NULL REFERENCES environment ON UPDATE CASCADE, name TEXT, -- name of the program, usually written lowercase by calling code e.g. cnn_crossval -- Names of built-in operations will be shown in upper case: e.g. 'FREEZE' - + message TEXT NOT NULL, -- user-defined message to help distinguish similar runs -- we use POSIX timestamps for time recording. -- e.g. datetime.datetime.now().timestamp() start_time REAL, - end_time REAL, - - process_id INTEGER, -- host PID of python process on host OS - environment BLOB NOT NULL REFERENCES environment ON UPDATE CASCADE, - message TEXT NOT NULL -- user-defined message to help distinguish similar runs + end_time REAL ); CREATE INDEX FK_program_environment ON program (environment); @@ -221,7 +201,7 @@ CREATE TABLE func( ); CREATE INDEX FK_func_module ON func (module); CREATE TABLE func_input( - uuid BLOB PRIMARY KEY NOT NULL, + key BLOB PRIMARY KEY NOT NULL, name TEXT NOT NULL, typename TEXT NOT NULL, func BLOB NOT NULL REFERENCES func ON UPDATE CASCADE, @@ -234,7 +214,7 @@ CREATE TABLE func_input( ); CREATE INDEX FK_func_input_func ON func_input (func); CREATE TABLE func_output( - uuid BLOB PRIMARY KEY NOT NULL, + key BLOB PRIMARY KEY NOT NULL, name TEXT, typename TEXT NOT NULL, func BLOB NOT NULL REFERENCES func ON UPDATE CASCADE, @@ -250,7 +230,7 @@ CREATE INDEX FK_func_output_func ON func_output (func); -- is executed in the context of a "program". Within a program, tasks are -- typically evaluated in a serial manner. CREATE TABLE task( - uuid BLOB PRIMARY KEY NOT NULL, + key BLOB PRIMARY KEY NOT NULL, program BLOB NOT NULL REFERENCES program ON UPDATE CASCADE, -- func is NULL for some built-in functionality like "RECORD" programs @@ -261,7 +241,7 @@ CREATE INDEX FK_task_func ON task (func); -- A datum is an object that is computed as the output of a task, given as a -- literal value in a config file, or loaded from a file. CREATE TABLE datum( - uuid BLOB PRIMARY KEY NOT NULL, + key BLOB PRIMARY KEY NOT NULL, -- provider_type describes where the datum came from. Choices are: -- - COMPUTED: output of a decorated Function -- - IMPORTED: imported output from a prior program @@ -281,7 +261,7 @@ CREATE INDEX FK_datum_task ON datum (task); CREATE INDEX FK_datum_task_output ON datum (task_output); -- A task_input records the version of a Datum that is passed to a function CREATE TABLE task_input( - uuid BLOB PRIMARY KEY NOT NULL, + key BLOB PRIMARY KEY NOT NULL, task BLOB NOT NULL REFERENCES task ON UPDATE CASCADE, -- if this was a python function, reference which input func_input BLOB REFERENCES func_input ON UPDATE CASCADE, diff --git a/src/program.rs b/src/program.rs index c6a82f3..d5a0099 100644 --- a/src/program.rs +++ b/src/program.rs @@ -1,15 +1,16 @@ use log; -use rusqlite::{Connection, Error as RSQLError, Transaction}; +use rusqlite::{Connection, Error as RSError, Transaction}; extern crate derive_more; use uuid::Uuid; use std::time::{Instant, SystemTime}; +use crate::environment::{Environment}; use crate::timing; #[derive(Debug)] pub struct Task { - pub uuid: Uuid, + pub key: Uuid, pub start: Instant, pub end: Instant, } @@ -22,20 +23,43 @@ pub struct TaskInput { #[derive(Debug)] pub struct Program { - pub uuid: Uuid, + pub key: Uuid, pub name: String, pub message: String, + pub environment: Environment, } impl Program { - pub fn new(tx: &mut Transaction, name: &str, message: &str) -> Program { - let u = Uuid::new_v4(); - log::debug!("New {} Program with UUID {}", name, u); - // TODO: Insert this program into the database - Program { - uuid: u, + pub fn new(tx: &mut Transaction, name: &str, message: &str) -> Result { + let key = Uuid::new_v4(); + log::debug!("New {} Program with UUID {}", name, key); + + let env = Environment::current(None); + env.record(tx)?; + log::debug!("Environment: {:#?}", env); + + /* + start_time REAL, + end_time REAL, + + environment BLOB NOT NULL REFERENCES environment ON UPDATE CASCADE, + message TEXT NOT NULL -- user-defined message to help distinguish similar runs + */ + tx.execute( + "INSERT INTO program VALUES (?1, ?2, ?3, ?4, NULL, NULL)", + ( + key.as_bytes(), + env.key.as_bytes(), + name, + message, + ), + )?; + + Ok(Program { + key: key, name: name.to_string(), message: message.to_string(), - } + environment: env, + }) } pub fn perform_task(self: &Program, inputs: &[TaskInput], f: F) -> R where @@ -44,7 +68,7 @@ impl Program { let start = Instant::now(); let u = Uuid::new_v4(); let mut task = Task { - uuid: u, + key: u, start: start, end: start, }; @@ -52,13 +76,22 @@ impl Program { task.end = Instant::now(); res } + pub fn record_timestamps(self: &Self, tx: &mut Transaction, start_stamp: f64, end_stamp: f64) -> + Result<(), RSError> { + tx.execute("UPDATE program SET start_time = ?1, end_time = ?2 WHERE key = ?3", + (start_stamp, end_stamp, self.key.as_bytes()), + )?; + Ok(()) + } } #[derive(Debug)] pub enum ProgramError { - CreateTransactionFailed(RSQLError), + CreateTransactionFailed(RSError), + NewProgramError(RSError), ProgramFailed(E), - CommitFailed(RSQLError), + RecordTimestampsFailed(RSError), + CommitFailed(RSError), } impl From for ProgramError { fn from(e: E) -> ProgramError { @@ -67,7 +100,7 @@ impl From for ProgramError { } /// Run a closure as a program, within a database transaction -pub fn with_program Result<(), E>>( +pub fn with_program Result<(), E>>( conn: &mut Connection, name: &str, message: &str, @@ -87,47 +120,42 @@ pub fn with_program Result<(), E>>( ); // start transaction - let txres = conn.transaction(); - match txres { - Err(e) => Err(ProgramError::CreateTransactionFailed(e)), - Ok(mut tx) => { - // start timer - log::debug!(target: log_target, "Starting timers"); - let start_st = SystemTime::now(); - let start = Instant::now(); - let start_stamp = timing::persistent_stamp(start, start, start_st); + let mut tx = conn.transaction() + .map_err(|e| { ProgramError::CreateTransactionFailed(e) })?; + // start timer + log::debug!(target: log_target, "Starting timers"); + let start_st = SystemTime::now(); + let start = Instant::now(); + let start_stamp = timing::persistent_stamp(start, start, start_st); - log::debug!( - target: log_target, - "Start time: {:?} f64 timestamp={:#?}", - start_st, - start_stamp - ); + log::debug!( + target: log_target, + "Start time: {:?} f64 timestamp={:#?}", + start_st, + start_stamp + ); - // Instantiate Program - // (record name and message for new program, get program ID) - let prog = Program::new(&mut tx, name, message); + // Instantiate Program + // (record name and message for new program, get program ID) + let prog = Program::new(&mut tx, name, message).map_err(|e| { ProgramError::NewProgramError(e) })?; - log::info!(target: log_target, "Running {:?}", prog); + log::info!(target: log_target, "Running {:?}", prog); - // run closure with program argument - f(prog, &mut tx)?; // if closure fails, transaction will be rolled back + // run closure with program argument + f(&prog, &mut tx)?; // if closure fails, transaction will be rolled back - // end timer - // Commit scope for RECORD program - let end = Instant::now(); - log::debug!( - target: log_target, - "Elapsed: {} seconds", - (end - start).as_secs_f64() - ); - // record end time - // commit transaction - log::debug!(target: log_target, "Committing transaction and finalizing program"); - match tx.commit() { - Err(e) => Err(ProgramError::CommitFailed(e)), - Ok(_) => Ok(()), - } - } - } + // end timer + // Commit scope for RECORD program + let end = Instant::now(); + log::debug!( + target: log_target, + "Elapsed: {} seconds", + (end - start).as_secs_f64() + ); + // record end time + let end_stamp = timing::persistent_stamp(end, start, start_st); + prog.record_timestamps(&mut tx, start_stamp, end_stamp).map_err(|e| {ProgramError::RecordTimestampsFailed(e) })?; + // commit transaction + log::debug!(target: log_target, "Committing transaction and finalizing program"); + tx.commit().map_err(|e| { ProgramError::CommitFailed(e) }) }