Persistent machine/user/env/program save to db

This commit is contained in:
Jacob Hinkle 2022-11-11 09:02:42 -05:00
parent dc172789db
commit 69b17a33eb
8 changed files with 312 additions and 114 deletions

49
Cargo.lock generated
View File

@ -383,6 +383,15 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "machine-uid"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f1595709b0a7386bcd56ba34d250d626e5503917d05d32cdccddcd68603e212"
dependencies = [
"winreg",
]
[[package]]
name = "memchr"
version = "2.5.0"
@ -408,12 +417,15 @@ dependencies = [
"env_logger",
"jwalk",
"log",
"machine-uid",
"once_cell",
"rayon",
"ring",
"rusqlite",
"rusqlite_migration",
"sys-info",
"uuid",
"whoami",
]
[[package]]
@ -588,6 +600,12 @@ version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e25dfac463d778e353db5be2449d1cce89bd6fd23c9f1ea21310ce6e5a1b29c4"
[[package]]
name = "sha1_smol"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012"
[[package]]
name = "smallvec"
version = "1.10.0"
@ -623,6 +641,16 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "sys-info"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b3a0d0aba8bf96a0e1ddfdc352fc53b3df7f39318c71854910c3c4b024ae52c"
dependencies = [
"cc",
"libc",
]
[[package]]
name = "termcolor"
version = "1.1.3"
@ -657,6 +685,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "feb41e78f93363bb2df8b0e86a2ca30eed7806ea16ea0c790d757cf93f79be83"
dependencies = [
"getrandom",
"sha1_smol",
]
[[package]]
@ -741,6 +770,17 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "whoami"
version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6631b6a2fd59b1841b622e8f1a7ad241ef0a46f2d580464ce8140ac94cbd571"
dependencies = [
"bumpalo",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "winapi"
version = "0.3.9"
@ -771,3 +811,12 @@ name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "winreg"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2986deb581c4fe11b621998a5e53361efe6b48a151178d0cd9eeffa4dc6acc9"
dependencies = [
"winapi",
]

View File

@ -5,6 +5,7 @@ edition = "2021"
authors = ["Jacob Hinkle <jacob.hinkle@jhink.org>"]
description = "Composable provenance tracking for scientific data analysis"
repository = "https://git.jhink.org/jacob/nancy"
license = "BSD-3-Clause"
readme = "README.md"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@ -24,9 +25,12 @@ derive_more = "0.99.17"
env_logger = "0.9.1"
jwalk = "0.6.0"
log = "0.4.17"
machine-uid = "0.2.0"
once_cell = "1.15.0"
rayon = "1.5.3"
ring = "0.16.20"
rusqlite = { version = "0.28.0", features = ["uuid"] }
rusqlite_migration = "1.0.0"
uuid = { version = "1.2.1", features = ["v4"] }
sys-info = "0.9.1"
uuid = { version = "1.2.1", features = ["v4", "v5"] }
whoami = "1.2.3"

146
src/environment.rs Normal file
View File

@ -0,0 +1,146 @@
use machine_uid;
use rusqlite::{Error as RSError, Result as RSResult, ToSql, Transaction, types as rstypes};
use sys_info;
use uuid::{Uuid};
use whoami;
use std::collections::{HashMap};
use std::env;
use std::fs::{File};
use std::io::{Error as IOError, Read};
/// Some simple namespaces to use for deriving UUID v5 keys
const NAMESPACE_MACHINE: Uuid = Uuid::from_bytes([1; 16]);
#[derive(Debug)]
pub struct Machine {
pub key: Uuid,
pub machine_id: String,
pub hostname: String,
pub os_type: String,
}
impl Machine {
pub fn current() -> Self {
let hostname = sys_info::hostname().unwrap_or("Unknown".to_string());
let mid = machine_uid::get().unwrap_or("Unknown".to_string());
let os_type = sys_info::os_type().unwrap_or("Unknown".to_string());
let key = Uuid::new_v5(
&NAMESPACE_MACHINE,
format!("{},{},{}", hostname, mid, os_type).as_bytes(),
);
Machine {
key: key,
machine_id: mid,
hostname: hostname,
os_type: os_type,
}
}
pub fn record(self: &Self, tx: &mut Transaction) -> RSResult<()> {
tx.execute(
"INSERT OR IGNORE INTO machine VALUES (?1, ?2, ?3, ?4)",
(
&self.key.as_bytes(),
&self.machine_id,
&self.hostname,
&self.os_type,
),
)?;
Ok(())
}
}
#[derive(Debug)]
pub struct User {
pub key: Uuid,
pub machine: Machine,
pub username: String,
pub realname: String,
}
impl User {
pub fn current() -> Self {
let machine = Machine::current();
let username = whoami::username();
let realname = whoami::realname();
let key = Uuid::new_v5(
&machine.key,
format!("{}\n{}", username, realname).as_bytes(),
);
User {
key: key,
machine: machine,
username: username,
realname: realname,
}
}
pub fn record(self: &Self, tx: &mut Transaction) -> RSResult<()> {
self.machine.record(tx);
tx.execute(
"INSERT OR IGNORE INTO user VALUES (?1, ?2, ?3, ?4)",
(
&self.key.as_bytes(),
&self.machine.key.as_bytes(),
&self.username,
&self.realname,
),
)?;
Ok(())
}
}
#[derive(Debug)]
pub struct Environment {
pub key: Uuid,
pub user: User,
pub os_release: String,
//pub timezone: String,
pub linux_os_release: String,
//pub env_vars: HashMap<String, String>,
pub additional_info: Option<String>,
}
impl Environment {
pub fn current(addl_info: Option<String>) -> Self {
let lor_result: Result<String, IOError> = (
|| {
let mut s = String::new();
let mut f = File::open("/etc/os-release")?;
f.read_to_string(&mut s)?;
Ok(s)
})();
let lor = lor_result.unwrap_or("Unknown".to_string());
let user = User::current();
let os_release = sys_info::os_release().unwrap_or("Unknown".to_string());
//let env_vars = HashMap::from_iter(env::vars());
let key = Uuid::new_v5(
&user.key,
format!(
"{}\n{}",
os_release,
lor,
).as_bytes(),
);
Environment {
key: key,
user: user,
os_release: os_release,
linux_os_release: lor,
//env_vars: env_vars,
additional_info: addl_info,
}
}
//fn load(tx: &Transaction, key: Uuid) -> Self {
//}
pub fn record(self: &Self, tx: &mut Transaction) -> RSResult<()> {
self.user.record(tx);
tx.execute(
"INSERT OR IGNORE INTO environment VALUES (?1, ?2, ?3, ?4, NULL, ?5)",
(
&self.key.as_bytes(),
&self.user.key.as_bytes(),
&self.os_release,
&self.linux_os_release,
&self.additional_info,
),
)?;
Ok(())
}
}

View File

@ -174,7 +174,6 @@ fn walk_and_insert(tx: &mut Transaction, p: &Path, path_key: Hash256) -> Result<
.to_str()
.expect("File name will be convertable to a UTF-8 string");
log::debug!("{:?} : finding target", path);
let symlink_target = if filetype == FileType::Symlink {
Some(
std::fs::read_link(&path)?
@ -183,7 +182,6 @@ fn walk_and_insert(tx: &mut Transaction, p: &Path, path_key: Hash256) -> Result<
.to_string()
)
} else { None };
log::debug!("{:?} : target = {:?}", path, symlink_target);
if entry.depth > depth {
// descending into directory
@ -201,16 +199,6 @@ fn walk_and_insert(tx: &mut Transaction, p: &Path, path_key: Hash256) -> Result<
//log::debug!("Up");
parent = dirstack.pop().unwrap();
depth -= 1;
log::debug!(
" parent={} ed={} d={}",
&(match parent {
Some(h) => format!("{}", h),
None => "None".to_string(),
})[..8],
entry.depth,
depth,
);
}
let mut ctx = Context::new(&SHA256);
@ -268,7 +256,7 @@ fn insert_file_content_hashes(tx: &mut Transaction) -> Result<(), RecordError> {
let hashes: Vec<IOResult<(i32, Hash256, usize, Instant)>> = allpaths.par_iter().map(|fpres| {
if let Ok((id, filepath)) = fpres {
let input = File::open(filepath)?;
let mut reader = BufReader::new(input);
let reader = BufReader::new(input);
let (chash, size_bytes) = buffered_hash256(reader)?;
Ok((*id, chash, size_bytes, Instant::now()))
} else { // should never happen

View File

@ -5,6 +5,7 @@ use uuid::Uuid;
//pub mod data;
pub mod db;
pub mod environment;
pub mod fs;
pub mod program;
pub mod timing;

View File

@ -54,19 +54,21 @@ fn init_schema(conn: &mut Connection) -> Uuid {
log::info!("Dataset UUID is {dataset_uuid}");
// Run an empty program so that the dataset log reflects when it was
// initialized
nancy::program::with_program(
if let Err(e) = nancy::program::with_program(
conn,
"INIT",
"Initialize dataset",
|prog, _tx| {
let _ = prog.perform_task(&[], |task| {
log::debug!("INIT task UUID is {}", task.uuid);
log::debug!("INIT task UUID is {}", task.key);
});
let okres: Result<(), ()> = Ok(());
okres
},
)
.expect("Empty program should not throw error");
) {
log::error!("Empty program error: {e:?}");
process::exit(1);
}
dataset_uuid
}
}
@ -130,7 +132,7 @@ fn main() {
let u = init_schema(&mut conn);
do_record(
&mut conn,
&format!("Initial record of dataset {:?}", u),
&format!("Initial recording of dataset {:?}", u),
&[dataset_path.to_path_buf()],
dataset_path,
);

View File

@ -30,25 +30,22 @@ CREATE TABLE triggers(
-- type are included but not the kernel version. Software that changes due to
-- updates should be included in the "environment" table instead.
CREATE TABLE machine(
sha256 BLOB PRIMARY KEY NOT NULL,
key BLOB PRIMARY KEY NOT NULL,
machine_id TEXT, -- platform-dependent unique hardware id
-- Linux: open('/etc/machine-id', 'r').read() (assumes systemd)
-- OSX: `ioreg -rd1 -c IOPlatformExpertDevice | grep IOPlatformUUID | awk '{$print $3}' | tr -d \"`
-- Windows: `reg query HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Cryptography /v MachineGuid`
hostname TEXT, -- platform.node(): 'lucky'
system TEXT, -- platform.system(): 'Linux'
cpu_type TEXT, -- platform.machine(): 'x86_64'
processor TEXT -- platform.processor():
hostname TEXT, -- e.g. 'lucky'
os_type TEXT -- e.g. 'Linux'
);
-- Programs are run by users on machines
CREATE TABLE user(
sha256 BLOB PRIMARY KEY NOT NULL,
key BLOB PRIMARY KEY NOT NULL,
machine BLOB NOT NULL REFERENCES machine ON UPDATE CASCADE,
username TEXT NOT NULL, -- getpass.getuser()
userid INTEGER, -- os.getuid()
fullname TEXT, -- on Linux/OSX: pwd.getpwuid(os.getuid()).pw_gecos
realname TEXT -- on Linux/OSX: pwd.getpwuid(os.getuid()).pw_gecos
-- on Windows: see https://stackoverflow.com/questions/21766954/how-to-get-windows-users-full-name-in-python
machine BLOB NOT NULL REFERENCES machine ON UPDATE CASCADE
);
CREATE INDEX FK_user_machine ON user (machine);
@ -68,7 +65,7 @@ CREATE INDEX FK_user_machine ON user (machine);
-- https://datatracker.ietf.org/doc/html/rfc4122.html
-- This is possible in Python by simply calling uuid.uuid4() with no arguments
CREATE TABLE dataset (
uuid BLOB PRIMARY KEY NOT NULL -- UUID generated by uuid.uuid4()
key BLOB PRIMARY KEY NOT NULL -- UUID generated by uuid.uuid4()
);
@ -104,7 +101,7 @@ END;
-- course dependent on filetype). Each version has a number, and was provided by
-- some program (and potentially a datum).
CREATE TABLE filedir_version (
uuid BLOB PRIMARY KEY NOT NULL,
key BLOB PRIMARY KEY NOT NULL,
filedir BLOB NOT NULL
REFERENCES filedir ON UPDATE CASCADE, -- parent filedir entry
@ -146,27 +143,13 @@ END; END;
-- python executable being used, and environment variables are recorded here.
-- Other info is available in the parent "machine" table.
CREATE TABLE environment (
sha256 BLOB PRIMARY KEY NOT NULL,
envvars_json TEXT, -- json.dumps(dict(os.environ), sort_keys=True)
python_implementation TEXT, -- platform.python_implementation(): 'cpython'
python_strversion TEXT, -- sys.version: '3.9.7 (default, Sep 16 2021, 13:09:58) \n[GCC 7.5.0]'
python_hexversion INTEGER, -- sys.hexversion: 50923504
user TEXT REFERENCES user ON UPDATE CASCADE,
key BLOB PRIMARY KEY NOT NULL,
user BLOB NOT NULL REFERENCES user ON UPDATE CASCADE,
os_release TEXT,
linux_os_release TEXT,
timezone TEXT, -- timezone, for interpreting event times
platform_release TEXT, -- platform.release(): '5.15.64'
freedesktop_os_release TEXT, -- requires python 3.10
-- platform.freedesktop_os_release() as JSON
-- "{'NAME': 'NixOS', 'ID': 'nixos',
-- 'PRETTY_NAME': 'NixOS 22.05 (Quokka)', 'BUG_REPORT_URL':
-- 'https://github.com/NixOS/nixpkgs/issues', 'BUILD_ID':
-- '22.05.20220902.67e4507', 'DOCUMENTATION_URL':
-- 'https://nixos.org/learn.html', 'HOME_URL': 'https://nixos.org/', 'LOGO':
-- 'nix-snowflake', 'SUPPORT_URL': 'https://nixos.org/community.html',
-- 'VERSION': '22.05 (Quokka)', 'VERSION_CODENAME': 'quokka', 'VERSION_ID':
-- '22.05'}"
win32_ver TEXT, -- platform.win32_ver() as JSON
mac_ver TEXT -- platform.mac_ver() as JSON
--envvars_json TEXT
additional_info TEXT -- other env context, possibly provided by a language wrapper (e.g. python version info)
);
@ -176,18 +159,15 @@ CREATE TABLE environment (
-- programs are automatically imported and merged when possible when loading a
-- "datum" from disk.
CREATE TABLE program (
uuid BLOB PRIMARY KEY NOT NULL,
key BLOB PRIMARY KEY NOT NULL,
environment BLOB NOT NULL REFERENCES environment ON UPDATE CASCADE,
name TEXT, -- name of the program, usually written lowercase by calling code e.g. cnn_crossval
-- Names of built-in operations will be shown in upper case: e.g. 'FREEZE'
message TEXT NOT NULL, -- user-defined message to help distinguish similar runs
-- we use POSIX timestamps for time recording.
-- e.g. datetime.datetime.now().timestamp()
start_time REAL,
end_time REAL,
process_id INTEGER, -- host PID of python process on host OS
environment BLOB NOT NULL REFERENCES environment ON UPDATE CASCADE,
message TEXT NOT NULL -- user-defined message to help distinguish similar runs
end_time REAL
);
CREATE INDEX FK_program_environment ON program (environment);
@ -221,7 +201,7 @@ CREATE TABLE func(
);
CREATE INDEX FK_func_module ON func (module);
CREATE TABLE func_input(
uuid BLOB PRIMARY KEY NOT NULL,
key BLOB PRIMARY KEY NOT NULL,
name TEXT NOT NULL,
typename TEXT NOT NULL,
func BLOB NOT NULL REFERENCES func ON UPDATE CASCADE,
@ -234,7 +214,7 @@ CREATE TABLE func_input(
);
CREATE INDEX FK_func_input_func ON func_input (func);
CREATE TABLE func_output(
uuid BLOB PRIMARY KEY NOT NULL,
key BLOB PRIMARY KEY NOT NULL,
name TEXT,
typename TEXT NOT NULL,
func BLOB NOT NULL REFERENCES func ON UPDATE CASCADE,
@ -250,7 +230,7 @@ CREATE INDEX FK_func_output_func ON func_output (func);
-- is executed in the context of a "program". Within a program, tasks are
-- typically evaluated in a serial manner.
CREATE TABLE task(
uuid BLOB PRIMARY KEY NOT NULL,
key BLOB PRIMARY KEY NOT NULL,
program BLOB NOT NULL REFERENCES program ON UPDATE CASCADE,
-- func is NULL for some built-in functionality like "RECORD" programs
@ -261,7 +241,7 @@ CREATE INDEX FK_task_func ON task (func);
-- A datum is an object that is computed as the output of a task, given as a
-- literal value in a config file, or loaded from a file.
CREATE TABLE datum(
uuid BLOB PRIMARY KEY NOT NULL,
key BLOB PRIMARY KEY NOT NULL,
-- provider_type describes where the datum came from. Choices are:
-- - COMPUTED: output of a decorated Function
-- - IMPORTED: imported output from a prior program
@ -281,7 +261,7 @@ CREATE INDEX FK_datum_task ON datum (task);
CREATE INDEX FK_datum_task_output ON datum (task_output);
-- A task_input records the version of a Datum that is passed to a function
CREATE TABLE task_input(
uuid BLOB PRIMARY KEY NOT NULL,
key BLOB PRIMARY KEY NOT NULL,
task BLOB NOT NULL REFERENCES task ON UPDATE CASCADE,
-- if this was a python function, reference which input
func_input BLOB REFERENCES func_input ON UPDATE CASCADE,

View File

@ -1,15 +1,16 @@
use log;
use rusqlite::{Connection, Error as RSQLError, Transaction};
use rusqlite::{Connection, Error as RSError, Transaction};
extern crate derive_more;
use uuid::Uuid;
use std::time::{Instant, SystemTime};
use crate::environment::{Environment};
use crate::timing;
#[derive(Debug)]
pub struct Task {
pub uuid: Uuid,
pub key: Uuid,
pub start: Instant,
pub end: Instant,
}
@ -22,20 +23,43 @@ pub struct TaskInput {
#[derive(Debug)]
pub struct Program {
pub uuid: Uuid,
pub key: Uuid,
pub name: String,
pub message: String,
pub environment: Environment,
}
impl Program {
pub fn new(tx: &mut Transaction, name: &str, message: &str) -> Program {
let u = Uuid::new_v4();
log::debug!("New {} Program with UUID {}", name, u);
// TODO: Insert this program into the database
Program {
uuid: u,
pub fn new(tx: &mut Transaction, name: &str, message: &str) -> Result<Program, RSError> {
let key = Uuid::new_v4();
log::debug!("New {} Program with UUID {}", name, key);
let env = Environment::current(None);
env.record(tx)?;
log::debug!("Environment: {:#?}", env);
/*
start_time REAL,
end_time REAL,
environment BLOB NOT NULL REFERENCES environment ON UPDATE CASCADE,
message TEXT NOT NULL -- user-defined message to help distinguish similar runs
*/
tx.execute(
"INSERT INTO program VALUES (?1, ?2, ?3, ?4, NULL, NULL)",
(
key.as_bytes(),
env.key.as_bytes(),
name,
message,
),
)?;
Ok(Program {
key: key,
name: name.to_string(),
message: message.to_string(),
}
environment: env,
})
}
pub fn perform_task<R, F>(self: &Program, inputs: &[TaskInput], f: F) -> R
where
@ -44,7 +68,7 @@ impl Program {
let start = Instant::now();
let u = Uuid::new_v4();
let mut task = Task {
uuid: u,
key: u,
start: start,
end: start,
};
@ -52,13 +76,22 @@ impl Program {
task.end = Instant::now();
res
}
pub fn record_timestamps(self: &Self, tx: &mut Transaction, start_stamp: f64, end_stamp: f64) ->
Result<(), RSError> {
tx.execute("UPDATE program SET start_time = ?1, end_time = ?2 WHERE key = ?3",
(start_stamp, end_stamp, self.key.as_bytes()),
)?;
Ok(())
}
}
#[derive(Debug)]
pub enum ProgramError<E> {
CreateTransactionFailed(RSQLError),
CreateTransactionFailed(RSError),
NewProgramError(RSError),
ProgramFailed(E),
CommitFailed(RSQLError),
RecordTimestampsFailed(RSError),
CommitFailed(RSError),
}
impl<E> From<E> for ProgramError<E> {
fn from(e: E) -> ProgramError<E> {
@ -67,7 +100,7 @@ impl<E> From<E> for ProgramError<E> {
}
/// Run a closure as a program, within a database transaction
pub fn with_program<E, F: FnOnce(Program, &mut Transaction) -> Result<(), E>>(
pub fn with_program<E, F: FnOnce(&Program, &mut Transaction) -> Result<(), E>>(
conn: &mut Connection,
name: &str,
message: &str,
@ -87,10 +120,8 @@ pub fn with_program<E, F: FnOnce(Program, &mut Transaction) -> Result<(), E>>(
);
// start transaction
let txres = conn.transaction();
match txres {
Err(e) => Err(ProgramError::CreateTransactionFailed(e)),
Ok(mut tx) => {
let mut tx = conn.transaction()
.map_err(|e| { ProgramError::CreateTransactionFailed(e) })?;
// start timer
log::debug!(target: log_target, "Starting timers");
let start_st = SystemTime::now();
@ -106,12 +137,12 @@ pub fn with_program<E, F: FnOnce(Program, &mut Transaction) -> Result<(), E>>(
// Instantiate Program
// (record name and message for new program, get program ID)
let prog = Program::new(&mut tx, name, message);
let prog = Program::new(&mut tx, name, message).map_err(|e| { ProgramError::NewProgramError(e) })?;
log::info!(target: log_target, "Running {:?}", prog);
// run closure with program argument
f(prog, &mut tx)?; // if closure fails, transaction will be rolled back
f(&prog, &mut tx)?; // if closure fails, transaction will be rolled back
// end timer
// Commit scope for RECORD program
@ -122,12 +153,9 @@ pub fn with_program<E, F: FnOnce(Program, &mut Transaction) -> Result<(), E>>(
(end - start).as_secs_f64()
);
// record end time
let end_stamp = timing::persistent_stamp(end, start, start_st);
prog.record_timestamps(&mut tx, start_stamp, end_stamp).map_err(|e| {ProgramError::RecordTimestampsFailed(e) })?;
// commit transaction
log::debug!(target: log_target, "Committing transaction and finalizing program");
match tx.commit() {
Err(e) => Err(ProgramError::CommitFailed(e)),
Ok(_) => Ok(()),
}
}
}
tx.commit().map_err(|e| { ProgramError::CommitFailed(e) })
}