diff --git a/src/db.rs b/src/db.rs index 14c19a0..185f41c 100644 --- a/src/db.rs +++ b/src/db.rs @@ -112,8 +112,10 @@ pub enum FindDatasetError { NoPathsProvided, #[error("Something went wrong when parsing paths")] PathError(std::io::Error), - #[error("None of the search paths belongs to an existing dataset. Returned path is the nearest - common ancestor of all searched paths (on same filesystem)")] + #[error( + "None of the search paths belongs to an existing dataset. Returned path is the nearest + common ancestor of all searched paths (on same filesystem)" + )] NoDataset(PathBuf), #[error("Some, but not all, search paths do not reside in an existing dataset")] SomeNotInDataset, diff --git a/src/environment.rs b/src/environment.rs index f47b606..e9ab56c 100644 --- a/src/environment.rs +++ b/src/environment.rs @@ -10,7 +10,7 @@ use std::io::{Error as IOError, Read}; /// Some simple namespaces to use for deriving UUID v5 keys const NAMESPACE_MACHINE: Uuid = Uuid::from_bytes([1; 16]); -#[derive(Clone,Debug)] +#[derive(Clone, Debug)] pub struct Machine { pub key: Uuid, pub machine_id: String, @@ -47,7 +47,7 @@ impl Machine { } } -#[derive(Clone,Debug)] +#[derive(Clone, Debug)] pub struct User { pub key: Uuid, pub machine: Machine, @@ -85,7 +85,7 @@ impl User { } } -#[derive(Clone,Debug)] +#[derive(Clone, Debug)] pub struct Environment { pub key: Uuid, pub user: User, @@ -97,25 +97,17 @@ pub struct Environment { } impl Environment { pub fn current(addl_info: Option) -> Self { - let lor_result: Result = ( - || { - let mut s = String::new(); - let mut f = File::open("/etc/os-release")?; - f.read_to_string(&mut s)?; - Ok(s) - })(); + let lor_result: Result = (|| { + let mut s = String::new(); + let mut f = File::open("/etc/os-release")?; + f.read_to_string(&mut s)?; + Ok(s) + })(); let lor = lor_result.unwrap_or_else(|_| "Unknown".to_string()); let user = User::current(); let os_release = sys_info::os_release().unwrap_or_else(|_| "Unknown".to_string()); //let env_vars = HashMap::from_iter(env::vars()); - let key = Uuid::new_v5( - &user.key, - format!( - "{}\n{}", - os_release, - lor, - ).as_bytes(), - ); + let key = Uuid::new_v5(&user.key, format!("{}\n{}", os_release, lor,).as_bytes()); Environment { key, user, diff --git a/src/fs.rs b/src/fs.rs index c8945f3..e78399e 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -4,7 +4,7 @@ use jwalk::{Error as JWalkError, WalkDir}; use log; use rayon::prelude::*; use ring::digest::{Context, SHA256}; -use rusqlite::{Error as RSError, Result as RSResult, ToSql, Transaction, types as rstypes}; +use rusqlite::{types as rstypes, Error as RSError, Result as RSResult, ToSql, Transaction}; use thiserror::Error; use uuid::Uuid; @@ -17,10 +17,9 @@ use std::time::{Instant, SystemTime}; use crate::timing::persistent_stamp; - #[derive(Copy, Clone, Debug, PartialEq)] pub enum FileType { - Other = 0, // char/block devices, FIFOs, sockets... + Other = 0, // char/block devices, FIFOs, sockets... Regular = 1, Directory = 2, Symlink = 3, @@ -65,7 +64,7 @@ pub enum RecordError { TooBigDepthJump, } -#[derive(Copy,Clone,Debug,From)] +#[derive(Copy, Clone, Debug, From)] pub struct Hash256([u8; 32]); impl fmt::LowerHex for Hash256 { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -114,7 +113,6 @@ fn buffered_hash256(mut reader: R) -> IOResult<(Hash256, usize)> { Ok((hash.into(), num_bytes)) } - /// Walk the directory while sorting by filename, maintaining a stack of hashes (keys) for /// directories and insert entries into current_files table as we pass over them. This means /// computing the sha256 key, storing it along with parent, filetype, and symlink_target. @@ -122,7 +120,8 @@ fn walk_and_insert(tx: &Transaction, p: &Path, path_key: Hash256) -> Result<(), let mut insert_stmt = tx.prepare( "INSERT OR IGNORE INTO current_files (sha256, name, filepath, parent, version_uuid, filetype, symlink_target, recorded_time) - VALUES(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)")?; + VALUES(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", + )?; let mut dirstack: Vec> = vec![]; let mut parent: Option = None; let mut lastkey: Option = None; @@ -133,15 +132,18 @@ fn walk_and_insert(tx: &Transaction, p: &Path, path_key: Hash256) -> Result<(), log::trace!("walk_and_insert {:?}", p); for entry_result in WalkDir::new(p) - .follow_links(false) - .skip_hidden(true) - .sort(true) { + .follow_links(false) + .skip_hidden(true) + .sort(true) + { let entry = entry_result?; let path = entry.path(); - let pathstr = path.to_str() + let pathstr = path + .to_str() .expect("Path will be convertable to a UTF-8 string"); let filetype = FileType::from(entry.file_type()); - let filename = path.file_name() + let filename = path + .file_name() .expect("Path will end in a named component (unlike / or foo/..)") .to_str() .expect("File name will be convertable to a UTF-8 string"); @@ -149,11 +151,13 @@ fn walk_and_insert(tx: &Transaction, p: &Path, path_key: Hash256) -> Result<(), let symlink_target = if filetype == FileType::Symlink { Some( std::fs::read_link(&path)? - .to_str() - .expect("Symlink target will be convertable to a UTF-8 string") - .to_string() - ) - } else { None }; + .to_str() + .expect("Symlink target will be convertable to a UTF-8 string") + .to_string(), + ) + } else { + None + }; if entry.depth > depth { // descending into directory @@ -178,7 +182,8 @@ fn walk_and_insert(tx: &Transaction, p: &Path, path_key: Hash256) -> Result<(), Some(Hash256(b)) => { ctx.update(&b[..]); } - None => { // Root. hash is the hash of the uuid for this dataset + None => { + // Root. hash is the hash of the uuid for this dataset ctx.update(&path_key.0); } } @@ -190,15 +195,15 @@ fn walk_and_insert(tx: &Transaction, p: &Path, path_key: Hash256) -> Result<(), let ver_uuid = Uuid::new_v4(); insert_stmt.execute(( - thiskey.0, - filename, - pathstr, - parent, - ver_uuid.as_bytes(), - filetype as u8, - symlink_target, - persistent_stamp(Instant::now(), start_instant, start_systemtime), - ))?; + thiskey.0, + filename, + pathstr, + parent, + ver_uuid.as_bytes(), + filetype as u8, + symlink_target, + persistent_stamp(Instant::now(), start_instant, start_systemtime), + ))?; lastkey = Some(thiskey); } Ok(()) @@ -210,31 +215,37 @@ fn insert_file_content_hashes(tx: &Transaction) -> Result<(), RecordError> { log::trace!("insert_file_content_hashes"); // Extract all regular files (key and path) from current_files, in arbitrary order, and compute // content hashes for each, then update the corresponding entry in current_files. - let allpaths: Vec> = tx.prepare( - &format!( + let allpaths: Vec> = tx + .prepare(&format!( "SELECT id, filepath FROM current_files WHERE filetype == {}", - FileType::Regular as u8) - )?.query_map((), |row| { + FileType::Regular as u8 + ))? + .query_map((), |row| { let id = row.get::(0).unwrap(); let filepath = row.get::(1).unwrap(); - //println!("{} {}", id, filepath); + //println!("{} {}", id, filepath); Ok((id, filepath)) - }).unwrap().collect::>>(); + }) + .unwrap() + .collect::>>(); let start_systemtime = SystemTime::now(); let start_instant = Instant::now(); - let hashes: Vec> = allpaths.par_iter().map(|fpres| { - if let Ok((id, filepath)) = fpres { - let input = File::open(filepath)?; - let reader = BufReader::new(input); - let (chash, size_bytes) = buffered_hash256(reader)?; - Ok((*id, chash, size_bytes, Instant::now())) - } else { // should never happen - Ok((0_i32, Hash256([0; 32]), 0, Instant::now())) - } - }).collect(); + let hashes: Vec> = allpaths + .par_iter() + .map(|fpres| { + if let Ok((id, filepath)) = fpres { + let input = File::open(filepath)?; + let reader = BufReader::new(input); + let (chash, size_bytes) = buffered_hash256(reader)?; + Ok((*id, chash, size_bytes, Instant::now())) + } else { + // should never happen + Ok((0_i32, Hash256([0; 32]), 0, Instant::now())) + } + }) + .collect(); let mut update_stmt = tx.prepare( "UPDATE current_files SET @@ -242,7 +253,8 @@ fn insert_file_content_hashes(tx: &Transaction) -> Result<(), RecordError> { size_bytes = ?3, recorded_time = ?4 WHERE - id = ?1")?; + id = ?1", + )?; for (id, hash, size_bytes, recorded_instant) in hashes.into_iter().flatten() { update_stmt.execute(( id, @@ -258,7 +270,7 @@ fn insert_file_content_hashes(tx: &Transaction) -> Result<(), RecordError> { /// current_files, but whose parents _do_ appear. Create new versions marking these as deleted. fn find_deleted(_tx: &Transaction, _p: &Path, _path_key: Hash256) -> Result<(), RecordError> { // Do a CTE from filedir to get key and relpath for all files below p (inclusive) - + // For files in filedir but not in current_files, insert entries into temp table deleted_files // marking them as deleted. @@ -281,7 +293,7 @@ fn compute_current_directory_hashes(_tx: &Transaction) -> Result<(), RecordError // case we finalize this parent dir by recording its size and hash. Once a directory is // finalized, we can update its row in current_files. The recorded time of a directory should // be the max of all children recorded times. - + //let mut dirstack: Vec<(Hash256, usize, Context, f64)> = Vec::new(); // Find all entries in current_files that declare parents that are not None and are not in @@ -294,8 +306,13 @@ fn compute_current_directory_hashes(_tx: &Transaction) -> Result<(), RecordError } /// Transfer rows in current_files and deleted_files into new rows in filedir_version -fn persist_temp_tables(tx: &Transaction, task_uuid: Uuid, dataset_uuid: Uuid) -> Result<(), RecordError> { - tx.execute(" +fn persist_temp_tables( + tx: &Transaction, + task_uuid: Uuid, + dataset_uuid: Uuid, +) -> Result<(), RecordError> { + tx.execute( + " INSERT OR IGNORE INTO filedir SELECT @@ -303,8 +320,9 @@ fn persist_temp_tables(tx: &Transaction, task_uuid: Uuid, dataset_uuid: Uuid) -> FROM current_files ", (dataset_uuid.as_bytes(),), - )?; - tx.execute(" + )?; + tx.execute( + " INSERT INTO filedir_version SELECT @@ -312,7 +330,7 @@ fn persist_temp_tables(tx: &Transaction, task_uuid: Uuid, dataset_uuid: Uuid) -> FROM current_files ", (task_uuid.as_bytes(),), - )?; + )?; Ok(()) } @@ -333,10 +351,11 @@ pub fn record( // get local dataset UUID let u = Uuid::parse_str( tx.prepare("SELECT value FROM local_metadata WHERE key = 'dataset_uuid'")? - .query_row([], |row| { - let uuidstr: Result = row.get(0); - uuidstr - })?.as_str() + .query_row([], |row| { + let uuidstr: Result = row.get(0); + uuidstr + })? + .as_str(), )?; log::debug!("Found dataset UUID: {}", u); @@ -351,7 +370,8 @@ pub fn record( // This schema is like filedir joined with filedir_version // TODO: revert this to a TEMP TABLE after it's debugged - tx.execute("CREATE TEMP TABLE current_files ( + tx.execute( + "CREATE TEMP TABLE current_files ( id INTEGER PRIMARY KEY NOT NULL, -- only used in this table sha256 BLOB NOT NULL, -- will become the primary key on filedir name TEXT NOT NULL, -- filename without path @@ -365,7 +385,9 @@ pub fn record( size_bytes INTEGER, -- null for anything but files/dirs. For dirs, the sum of children content_sha256 BLOB, UNIQUE(sha256) -- prevent inserting same file twice - )", [])?; + )", + [], + )?; for p in paths { let p = p.canonicalize()?; @@ -377,8 +399,11 @@ pub fn record( // UUID to indicate the root directory, then taking the SHA256 of the parent plus filename, // as we descend into subdirectories until we reach p. let mut ancestors = LinkedList::new(); - for a in p.ancestors() { // fill a stack of ancestor dirs - if a.canonicalize()? == ds_root.canonicalize()? { break }; + for a in p.ancestors() { + // fill a stack of ancestor dirs + if a.canonicalize()? == ds_root.canonicalize()? { + break; + }; ancestors.push_front(a); } @@ -386,7 +411,8 @@ pub fn record( let mut prev_key = root_key; for a in ancestors { // hash of parent hash + filename - let filename = a.file_name() + let filename = a + .file_name() .ok_or_else(|| RecordError::CantGetFilename(a.to_owned()))? .to_str() .ok_or_else(|| RecordError::FilenameNotUTF8(a.to_owned()))?; @@ -412,7 +438,7 @@ pub fn record( compute_current_directory_hashes(tx)?; persist_temp_tables(tx, task_uuid, u)?; - + // Drop the temporary current_files table log::debug!("Dropping temp table current_files"); tx.execute("DROP TABLE current_files", ())?; diff --git a/src/main.rs b/src/main.rs index 3d844f0..cda6cd3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,7 +24,7 @@ enum Command { #[arg(short, long)] name: String, /// The top level directory of the dataset (must be a directory) - #[arg(default_value=".")] + #[arg(default_value = ".")] dataset_path: PathBuf, }, /// Record changes to files/directories within a dataset (or create a new dataset) @@ -41,27 +41,22 @@ enum Command { } fn init_schema(conn: &mut Connection, name: &str) -> Result { - let dataset_uuid = nancy::db::init(conn, name) - .map_err(|e| anyhow!("failed to initialize schema: {}", e))?; + let dataset_uuid = + nancy::db::init(conn, name).map_err(|e| anyhow!("failed to initialize schema: {}", e))?; log::trace!("Init OK"); log::info!("Dataset UUID is {dataset_uuid}"); // Run an empty program so that the dataset log reflects when it was // initialized - nancy::program::with_program( - conn, - "INIT", - "Initialize dataset", - |prog| { - let _ = prog.perform_task(&[], |task| { - log::debug!("INIT task UUID is {}", task.key); - Ok::<(), ()>(()) - }); - let okres: Result<()> = Ok(()); - okres - }, - ) - .context("Could not run empty program during init_schema")??; + nancy::program::with_program(conn, "INIT", "Initialize dataset", |prog| { + let _ = prog.perform_task(&[], |task| { + log::debug!("INIT task UUID is {}", task.key); + Ok::<(), ()>(()) + }); + let okres: Result<()> = Ok(()); + okres + }) + .context("Could not run empty program during init_schema")??; Ok(dataset_uuid) } @@ -69,12 +64,18 @@ fn init_schema(conn: &mut Connection, name: &str) -> Result { /// Run init subcommand and return the return code fn init_cmd(name: &str, dataset_path: &Path) -> Result { if !dataset_path.is_dir() { - bail!("Path {:?} does not point to an existing directory", dataset_path); + bail!( + "Path {:?} does not point to an existing directory", + dataset_path + ); } let dbpath = &dataset_path.join("nancy.db"); if dbpath.exists() { - bail!("Database {:?} exists, indicating this dataset is already \ - initialized. Refusing to overwrite.", dbpath); + bail!( + "Database {:?} exists, indicating this dataset is already \ + initialized. Refusing to overwrite.", + dbpath + ); } log::info!("Initializing new database at {:?}", dbpath); let mut conn = Connection::open(dbpath) @@ -99,7 +100,8 @@ fn record_cmd(message: &str, record_paths: &Vec) -> Result<()> { let mut conn = Connection::open_with_flags( dbpath, OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX, - ).context("Could not open existing SQLite database at {dbpath:?}: {e:?}")?; + ) + .context("Could not open existing SQLite database at {dbpath:?}: {e:?}")?; conn.pragma_update(None, "foreign_keys", &"ON")?; @@ -117,7 +119,8 @@ fn record_cmd(message: &str, record_paths: &Vec) -> Result<()> { task.key, ) }) - })??.map_err(|e| anyhow!("Record program failed: {}", e)) + })?? + .map_err(|e| anyhow!("Record program failed: {}", e)) } fn main() -> Result<()> { @@ -125,10 +128,9 @@ fn main() -> Result<()> { let args = Cli::parse(); match &args.command { - Command::Init { - name, - dataset_path - } => { init_cmd(name, dataset_path)?; }, + Command::Init { name, dataset_path } => { + init_cmd(name, dataset_path)?; + } Command::Record { message, record_paths, diff --git a/src/program.rs b/src/program.rs index 6edd662..822e3ae 100644 --- a/src/program.rs +++ b/src/program.rs @@ -32,22 +32,20 @@ impl Task { log::debug!("New Task with UUID {}", key); // NOTE: we currently do not save the "function" column - program.transaction.execute( - "INSERT INTO task VALUES (?1, ?2, NULL)", - ( - key.as_bytes(), - program.key.as_bytes(), - ), - ).map_err(TaskError::InsertTaskFailed)?; + program + .transaction + .execute( + "INSERT INTO task VALUES (?1, ?2, NULL)", + (key.as_bytes(), program.key.as_bytes()), + ) + .map_err(TaskError::InsertTaskFailed)?; - Ok(Task { - key, - }) + Ok(Task { key }) } } #[derive(Debug, Error)] -pub enum ProgramError{ +pub enum ProgramError { #[error("creating transaction failed: {0}")] CreateTransactionFailed(RSError), #[error("creating new program failed: {0}")] @@ -66,7 +64,7 @@ pub enum ProgramError{ PerformedTaskWhileNotRunning, } -#[derive(Clone,Debug,PartialEq)] +#[derive(Clone, Debug, PartialEq)] enum ProgramState { Initialized, Running, @@ -85,7 +83,11 @@ pub struct Program<'conn> { state: ProgramState, } impl<'conn> Program<'conn> { - pub fn new(tx: &'conn Transaction, name: &str, message: &str) -> Result, ProgramError> { + pub fn new( + tx: &'conn Transaction, + name: &str, + message: &str, + ) -> Result, ProgramError> { let log_target = &format!("nancy.program ({})", name); // start transaction @@ -116,19 +118,22 @@ impl<'conn> Program<'conn> { state: ProgramState::Initialized, }; - prog.environment.record(prog.transaction) + prog.environment + .record(prog.transaction) .map_err(ProgramError::RecordEnvFailed)?; log::debug!("Environment: {:#?}", prog.environment); - prog.transaction.execute( - "INSERT INTO program VALUES (?1, ?2, ?3, ?4, NULL, NULL)", - ( - key.as_bytes(), - prog.environment.key.as_bytes(), - name, - message, - ), - ).map_err(ProgramError::InsertProgramFailed)?; + prog.transaction + .execute( + "INSERT INTO program VALUES (?1, ?2, ?3, ?4, NULL, NULL)", + ( + key.as_bytes(), + prog.environment.key.as_bytes(), + name, + message, + ), + ) + .map_err(ProgramError::InsertProgramFailed)?; Ok(prog) } @@ -155,15 +160,14 @@ impl<'conn> Program<'conn> { "Elapsed: {} seconds", (end - self.start_instant).as_secs_f64() ); - let end_stamp = timing::persistent_stamp( - end, - self.start_instant, - self.start_systemtime, - ); + let end_stamp = timing::persistent_stamp(end, self.start_instant, self.start_systemtime); - self.transaction.execute("UPDATE program SET end_time = ?1 WHERE key = ?2", - (end_stamp, self.key.as_bytes()), - ).map_err(ProgramError::RecordTimestampsFailed)?; + self.transaction + .execute( + "UPDATE program SET end_time = ?1 WHERE key = ?2", + (end_stamp, self.key.as_bytes()), + ) + .map_err(ProgramError::RecordTimestampsFailed)?; Ok(()) } @@ -172,7 +176,10 @@ impl<'conn> Drop for Program<'conn> { /// Checks that the program has been cleaned up by setting state to Finished fn drop(&mut self) { if self.state != ProgramState::Finished { - log::error!("Program reached destructor with unfinished state={:?}", self.state); + log::error!( + "Program reached destructor with unfinished state={:?}", + self.state + ); } } } @@ -185,7 +192,8 @@ pub fn with_program( f: F, ) -> Result, ProgramError> where - F: FnOnce(&mut Program) -> Result { + F: FnOnce(&mut Program) -> Result, +{ // NOTE: for Errors outside of f(prog), we should not rely on ?, so that we can keep the // From instance for ProgramError as general as possible. Instead, we should manually check // errors in the surrounding code in this function and only use ? for f(prog). @@ -199,8 +207,9 @@ where message ); - let tx = conn.transaction() - .map_err(|e| { ProgramError::CreateTransactionFailed(e) })?; + let tx = conn + .transaction() + .map_err(|e| ProgramError::CreateTransactionFailed(e))?; let mut prog = Program::new(&tx, name, message)?; @@ -215,15 +224,18 @@ where prog.record_timestamps()?; // commit transaction - log::debug!(target: log_target, "Committing transaction and finalizing program"); + log::debug!( + target: log_target, + "Committing transaction and finalizing program" + ); prog.state = ProgramState::Finished; log::debug!(target: log_target, "Set prog.state to Finished"); drop(prog); // stop borrowing tx, since tx.commit() will consume tx log::debug!(target: log_target, "Dropped prog"); - tx.commit().map_err(|e| { ProgramError::CommitFailed(e) })?; + tx.commit().map_err(|e| ProgramError::CommitFailed(e))?; } - + Ok(ret) } diff --git a/tests/record.rs b/tests/record.rs index e352ef0..096b696 100644 --- a/tests/record.rs +++ b/tests/record.rs @@ -1,11 +1,11 @@ use assert_cmd::prelude::*; -use assert_fs::prelude::*; -use predicates::prelude::*; +//use assert_fs::prelude::*; +//use predicates::prelude::*; use std::fs; use std::io::Write; -use std::path::{Path}; -use std::process::{Command}; +use std::path::Path; +use std::process::Command; fn empty_dir() -> Result { let dir = assert_fs::TempDir::new()?; @@ -15,7 +15,8 @@ fn empty_dir() -> Result { fn init_dir(dir: &Path) -> Result<(), assert_cmd::cargo::CargoError> { let mut cmd = Command::cargo_bin("nancy")?; cmd.arg("init") - .arg("--name").arg("Temporary data directory") + .arg("--name") + .arg("Temporary data directory") .arg(dir) .assert() .success(); @@ -28,12 +29,8 @@ fn add_some_files(dir: &Path) -> Result<(), Box> { let bazpath = subdir.join("baz.txt"); fs::create_dir_all(subdir)?; let mut bazfile = fs::File::create(bazpath)?; - write!( - bazfile, - "{}", - "bat", - )?; - + write!(bazfile, "{}", "bat",)?; + Ok(()) } @@ -41,7 +38,8 @@ fn add_some_files(dir: &Path) -> Result<(), Box> { fn init_missing_dir() -> Result<(), Box> { let mut cmd = Command::cargo_bin("nancy")?; cmd.arg("init") - .arg("--name").arg("Missing data directory") + .arg("--name") + .arg("Missing data directory") .arg("/path/to/missing/dir") .assert() .failure(); @@ -49,7 +47,6 @@ fn init_missing_dir() -> Result<(), Box> { Ok(()) } - #[test] fn init_empty_dir() -> Result<(), Box> { let dir = empty_dir()?; @@ -61,8 +58,10 @@ fn init_empty_dir() -> Result<(), Box> { fn record_dir(dir: &Path) -> Result { let mut cmd = Command::cargo_bin("nancy")?; - Ok(cmd.arg("record") - .arg("--message").arg("Test recording") + Ok(cmd + .arg("record") + .arg("--message") + .arg("Test recording") .arg(dir) .assert() .success())