diff --git a/Cargo.lock b/Cargo.lock index 02d7e05..c2d628b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -22,6 +22,18 @@ dependencies = [ "memchr", ] +[[package]] +name = "arrayref" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4c527152e37cf757a3f78aae5a06fbeefdb07ccc535c980a3208ee3060dd544" + +[[package]] +name = "arrayvec" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" + [[package]] name = "atty" version = "0.2.14" @@ -33,12 +45,32 @@ dependencies = [ "winapi", ] +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + [[package]] name = "bitflags" version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "blake3" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a08e53fc5a564bb15bfe6fae56bd71522205f1f91893f9c0116edad6496c183f" +dependencies = [ + "arrayref", + "arrayvec", + "cc", + "cfg-if", + "constant_time_eq", + "digest", +] + [[package]] name = "block-buffer" version = "0.10.3" @@ -48,6 +80,18 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bumpalo" +version = "3.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba" + +[[package]] +name = "cc" +version = "1.0.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "581f5dba903aac52ea3feb5ec4810848460ee833876f1f9b0fdeab1f19091574" + [[package]] name = "cfg-if" version = "1.0.0" @@ -91,6 +135,12 @@ dependencies = [ "os_str_bytes", ] +[[package]] +name = "constant_time_eq" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" + [[package]] name = "convert_case" version = "0.4.0" @@ -98,12 +148,70 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" [[package]] -name = "cpufeatures" -version = "0.2.5" +name = "crossbeam" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28d997bd5e24a5928dd43e46dc529867e207907fe0b239c3477d924f7f2ca320" +checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c" dependencies = [ - "libc", + "cfg-if", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "715e8152b692bba2d374b53d4875445368fdf21a94751410af607a5ac677d1fc" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f916dfc5d356b0ed9dae65f1db9fc9770aa2851d2662b988ccf4fe3516e86348" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "memoffset", + "scopeguard", +] + +[[package]] +name = "crossbeam-queue" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cd42583b04998a5363558e5f9291ee5a5ff6b49944332103f251e7479a82aa7" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edbafec5fa1f196ca66527c1b12c2ec4745ca14b50f1ad8f9f6f720b55d11fac" +dependencies = [ + "cfg-if", ] [[package]] @@ -137,8 +245,15 @@ checksum = "adfbc57365a37acbd2ebf2b64d7e69bb766e2fea813521ed536f5d0520dcf86c" dependencies = [ "block-buffer", "crypto-common", + "subtle", ] +[[package]] +name = "either" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797" + [[package]] name = "env_logger" version = "0.9.1" @@ -224,6 +339,25 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "js-sys" +version = "0.3.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49409df3e3bf0856b916e2ceaca09ee28e6871cf7d9ce97a692cacfdb2a25a47" +dependencies = [ + "wasm-bindgen", +] + +[[package]] +name = "jwalk" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "172752e853a067cbce46427de8470ddf308af7fd8ceaf9b682ef31a5021b6bb9" +dependencies = [ + "crossbeam", + "rayon", +] + [[package]] name = "libc" version = "0.2.135" @@ -255,21 +389,43 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "memoffset" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce" +dependencies = [ + "autocfg", +] + [[package]] name = "nancy" version = "0.1.0" dependencies = [ + "blake3", "clap", "derive_more", "env_logger", + "jwalk", "log", "once_cell", + "rayon", + "ring", "rusqlite", "rusqlite_migration", - "sha2", "uuid", ] +[[package]] +name = "num_cpus" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "once_cell" version = "1.15.0" @@ -330,6 +486,30 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rayon" +version = "1.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd99e5772ead8baa5215278c9b15bf92087709e9c1b2d1f97cdb5a183c933a7d" +dependencies = [ + "autocfg", + "crossbeam-deque", + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "258bcdb5ac6dad48491bb2992db6b7cf74878b0384908af124823d118c99683f" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-utils", + "num_cpus", +] + [[package]] name = "regex" version = "1.6.0" @@ -347,6 +527,21 @@ version = "0.6.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244" +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin", + "untrusted", + "web-sys", + "winapi", +] + [[package]] name = "rusqlite" version = "0.28.0" @@ -381,35 +576,42 @@ dependencies = [ "semver", ] +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + [[package]] name = "semver" version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e25dfac463d778e353db5be2449d1cce89bd6fd23c9f1ea21310ce6e5a1b29c4" -[[package]] -name = "sha2" -version = "0.10.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0" -dependencies = [ - "cfg-if", - "cpufeatures", - "digest", -] - [[package]] name = "smallvec" version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "strsim" version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "subtle" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" + [[package]] name = "syn" version = "1.0.102" @@ -442,6 +644,12 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "uuid" version = "1.2.1" @@ -469,6 +677,70 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasm-bindgen" +version = "0.2.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eaf9f5aceeec8be17c128b2e93e031fb8a4d469bb9c4ae2d7dc1888b26887268" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c8ffb332579b0557b52d268b91feab8df3615f265d5270fec2a8c95b17c1142" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "052be0f94026e6cbc75cdefc9bae13fd6052cdcaf532fa6c45e7ae33a1e6c810" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07bc0c051dc5f23e307b13285f9d75df86bfdf816c5721e573dec1f9b8aa193c" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c38c045535d93ec4f0b4defec448e4291638ee608530863b1e2ba115d4fff7f" + +[[package]] +name = "web-sys" +version = "0.3.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bcda906d8be16e728fd5adc5b729afad4e444e106ab28cd1c7256e54fa61510f" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 49bc2a0..b0595ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,12 +18,15 @@ name = "nancy" path = "src/main.rs" [dependencies] +blake3 = "1.3.1" clap = { version = "4.0.14", features = ["derive"] } derive_more = "0.99.17" env_logger = "0.9.1" +jwalk = "0.6.0" log = "0.4.17" once_cell = "1.15.0" +rayon = "1.5.3" +ring = "0.16.20" rusqlite = { version = "0.28.0", features = ["uuid"] } rusqlite_migration = "1.0.0" -sha2 = "0.10.6" uuid = { version = "1.2.1", features = ["v4"] } diff --git a/src/fs.rs b/src/fs.rs index e2db9a8..b6ab52c 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -1,18 +1,407 @@ -//use jwalk; +use derive_more::{From}; +use jwalk::{Error as JWalkError, WalkDir, WalkDirGeneric}; use log; -//use sha2::{digest::FixedOutput, Digest, Sha256}; -use std::path::Path; +use rayon::prelude::*; +use ring::digest::{Context, SHA256}; +use rusqlite::{Error as RSError, Result as RSResult, ToSql, Transaction, types as rstypes}; +use uuid::{Uuid}; + +use std::collections::{LinkedList}; +use std::fmt; +use std::fs::{File}; +use std::io::{BufReader, Error as IOError, Read, Result as IOResult}; +use std::path::{Path, PathBuf}; +use std::time::{Instant, SystemTime}; + +use crate::timing::{persistent_stamp}; + + +#[derive(Copy, Clone, Debug, PartialEq)] +pub enum FileType { + Other = 0, // char/block devices, FIFOs, sockets... + Regular = 1, + Directory = 2, + Symlink = 3, +} +impl From for FileType { + fn from(ft: std::fs::FileType) -> Self { + if ft.is_file() { + FileType::Regular + } else if ft.is_dir() { + FileType::Directory + } else if ft.is_symlink() { + FileType::Symlink + } else { + FileType::Other + } + } +} +impl fmt::Display for FileType { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{:?}", self); + Ok(()) + } +} + #[derive(Debug)] +struct FSVersion { + uuid: Uuid, + //content_sha256: [u8; 32], + //filename: String, + //parent: [u8; 32], +} +impl FSVersion { + fn from_file(path: &Path, dataset_uuid: Uuid, dataset_root: &Path) -> FSVersion { + let u = Uuid::new_v4(); + //let hasher = Sha256::new(); + FSVersion { + uuid: u, + //filename: "", + //content_sha256: hasher.finalize(), + } + } +} + + +#[derive(Debug, From)] pub enum RecordError { + SQLError(RSError), + IOError(IOError), + DirectoryWalkError(JWalkError), + UUIDParseError(uuid::Error), + CantGetFilename, + FilenameNotUTF8, + ParentHashNotSet, + TooBigDepthJump, NotImplemented, } -pub fn record(path: &Path, message: &String) -> Result<(), RecordError> { - log::info!( - "Recording path {:?} with user-provided message \"{}\"", - path, - message - ); - Err(RecordError::NotImplemented) +#[derive(Clone,Debug)] +struct FileInfo { + parent_sha256: Option< + [u8; 32] + >, + content_hash: [u8; 32], +} +impl Default for FileInfo { + fn default() -> Self { + FileInfo { + parent_sha256: None, + content_hash: [0; 32], + } + } +} + +#[derive(Copy,Clone,Debug,From)] +pub struct Hash256([u8; 32]); +impl fmt::LowerHex for Hash256 { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + for byte in self.0 { + write!(f, "{:x}", byte); + } + Ok(()) + } +} +impl fmt::UpperHex for Hash256 { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + for byte in self.0 { + write!(f, "{:X}", byte); + } + Ok(()) + } +} +impl fmt::Display for Hash256 { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + for byte in self.0 { + write!(f, "{:x}", byte); + } + Ok(()) + } +} +impl ToSql for Hash256 { + fn to_sql(&self) -> RSResult> { + Ok(rstypes::ToSqlOutput::from(&self.0[..])) + } +} + +fn buffered_hash256<'a, R: Read>(mut reader: R) -> IOResult<(Hash256, usize)> { + let mut ctx = Context::new(&SHA256); + let mut buffer = [0; 1024 * 128]; + let mut num_bytes: usize = 0; + loop { + let count = reader.read(&mut buffer)?; + if count == 0 { + break; + } + ctx.update(&buffer[..count]); + num_bytes += count; + } + let mut hash: [u8; 32] = [0; 32]; + hash.clone_from_slice(ctx.finish().as_ref()); + Ok((hash.into(), num_bytes)) +} + + +/// Walk the directory while sorting by filename, maintaining a stack of hashes (keys) for +/// directories and insert entries into current_files table as we pass over them. This means +/// computing the sha256 key, storing it along with parent, filetype, and symlink_target. +fn walk_and_insert(tx: &mut Transaction, p: &Path, path_key: Hash256) -> Result<(), RecordError> { + let mut insert_stmt = tx.prepare( + "INSERT OR IGNORE INTO current_files + (sha256, name, filepath, parent, version_uuid, filetype, symlink_target, recorded_time) + VALUES(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)")?; + let mut dirstack: Vec> = vec![]; + let mut parent: Option = None; + let mut lastkey: Option = None; + let mut depth: usize = 0; + + let start_systemtime = SystemTime::now(); + let start_instant = Instant::now(); + + log::trace!("walk_and_insert {:?}", p); + for entry_result in WalkDir::new(p) + .follow_links(false) + .skip_hidden(true) + .sort(true) { + let entry = entry_result?; + let path = entry.path(); + let pathstr = path.to_str() + .expect("Path will be convertable to a UTF-8 string"); + let filetype = FileType::from(entry.file_type()); + let filename = path.file_name() + .expect("Path will end in a named component (unlike / or foo/..)") + .to_str() + .expect("File name will be convertable to a UTF-8 string"); + + log::debug!("{:?} : finding target", path); + let symlink_target = if filetype == FileType::Symlink { + Some( + std::fs::read_link(&path)? + .to_str() + .expect("Symlink target will be convertable to a UTF-8 string") + .to_string() + ) + } else { None }; + log::debug!("{:?} : target = {:?}", path, symlink_target); + + if entry.depth > depth { + // descending into directory + if entry.depth != depth + 1 { + // we should never descend two or more steps at a time + return Err(RecordError::TooBigDepthJump); + } + //log::debug!("Down"); + dirstack.push(parent); + parent = lastkey; + depth += 1; + } + while depth > entry.depth { + // finished processing subdirectory, going back up the stack + //log::debug!("Up"); + parent = dirstack.pop().unwrap(); + depth -= 1; + + log::debug!( + " parent={} ed={} d={}", + &(match parent { + Some(h) => format!("{}", h), + None => "None".to_string(), + })[..8], + entry.depth, + depth, + ); + } + + let mut ctx = Context::new(&SHA256); + match parent { + Some(Hash256(b)) => { + ctx.update(&b[..]); + } + None => { // Root. hash is the hash of the uuid for this dataset + ctx.update(&path_key.0); + } + } + ctx.update(filename.as_bytes()); + let mut thiskey: [u8; 32] = [0; 32]; + thiskey.clone_from_slice(ctx.finish().as_ref()); + let thiskey = Hash256(thiskey); + + let ver_uuid = Uuid::new_v4(); + + insert_stmt.execute(( + thiskey.0, + filename, + pathstr, + parent, + ver_uuid.as_bytes(), + filetype as u8, + symlink_target, + persistent_stamp(Instant::now(), start_instant, start_systemtime), + ))?; + lastkey = Some(thiskey); + } + Ok(()) +} + +/// Compute content hashes for all files in the current_files temp table. Symlinks and "Other" +/// types will have NULL hashes, and directory hashes will be computed in a separate step. +fn insert_file_content_hashes(tx: &mut Transaction) -> Result<(), RecordError> { + log::trace!("insert_file_content_hashes"); + // Extract all regular files (key and path) from current_files, in arbitrary order, and compute + // content hashes for each, then update the corresponding entry in current_files. + let allpaths: Vec> = tx.prepare( + &format!( + "SELECT id, filepath FROM current_files WHERE filetype == {}", + FileType::Regular as u8) + )?.query_map((), |row| { + let id = row.get::(0).unwrap(); + let filepath = row.get::(1).unwrap(); + //println!("{} {}", id, filepath); + Ok((id, filepath)) + }).unwrap().collect::>>(); + + let start_systemtime = SystemTime::now(); + let start_instant = Instant::now(); + + let hashes: Vec> = allpaths.par_iter().map(|fpres| { + if let Ok((id, filepath)) = fpres { + let input = File::open(filepath)?; + let mut reader = BufReader::new(input); + let (chash, size_bytes) = buffered_hash256(reader)?; + Ok((*id, chash, size_bytes, Instant::now())) + } else { // should never happen + Ok((0 as i32, Hash256([0; 32]), 0, Instant::now())) + } + }).collect(); + let mut update_stmt = tx.prepare( + "UPDATE current_files + SET + content_sha256 = ?2, + size_bytes = ?3, + recorded_time = ?4 + WHERE + id = ?1")?; + for hashresult in hashes { + if let Ok((id, hash, size_bytes, recorded_instant)) = hashresult { + update_stmt.execute(( + id, + hash, + size_bytes, + persistent_stamp(recorded_instant, start_instant, start_systemtime), + ))?; + } + } + Ok(()) +} + +pub fn record( + tx: &mut Transaction, + paths: &[PathBuf], + ds_root: &Path, + message: &str, + //task_uuid: Uuid, +) -> Result<(), RecordError> { + log::info!( + "Recording path {:?} for dataset at {:?} with user-provided message \"{}\"", + paths, + ds_root, + message, + ); + + // get local dataset UUID + let u = Uuid::parse_str( + tx.prepare("SELECT value FROM local_metadata WHERE key = 'dataset_uuid'")? + .query_row([], |row| { + let uuidstr: Result = row.get(0); + uuidstr + })?.as_str() + )?; + log::debug!("Found dataset UUID: {}", u); + + // compute dataset root key. This is the SHA256 of the UUID + let mut ctx = Context::new(&SHA256); + ctx.update(u.as_bytes()); + let mut root_key: [u8; 32] = [0; 32]; + root_key.clone_from_slice(ctx.finish().as_ref()); + let root_key = Hash256(root_key); + + log::info!("Root key is {}", root_key); + + // This schema is like filedir joined with filedir_version + // TODO: revert this to a TEMP TABLE after it's debugged + tx.execute("CREATE + -- TEMP + TABLE current_files ( + id INTEGER PRIMARY KEY NOT NULL, -- only used in this table + sha256 BLOB NOT NULL, -- will become the primary key on filedir + name TEXT NOT NULL, -- filename without path + filepath TEXT NOT NULL, -- local path to file + parent BLOB, -- hash referencing either filedir or this table + version_uuid BLOB NOT NULL, -- generated UUID v4 for this version + recorded_time REAL, -- float64 representation of unix timestamp + filetype INTEGER, -- corresponds to the FileType enum + -- deleted BOOL NOT NULL, -- deleted is always FALSE for this table + symlink_target TEXT, + size_bytes INTEGER, -- null for anything but files/dirs. For dirs, the sum of children + content_sha256 BLOB, + UNIQUE(sha256) -- prevent inserting same file twice + )", [])?; + + for p in paths { + let p = p.canonicalize()?; + + let mut ctx = Context::new(&SHA256); + ctx.update(u.as_bytes()); + + // manually compute the SHA256 key for this path, starting with the SHA256 of the dataset + // UUID to indicate the root directory, then taking the SHA256 of the parent plus filename, + // as we descend into subdirectories until we reach p. + let mut ancestors = LinkedList::new(); + for a in p.ancestors() { // fill a stack of ancestor dirs + if a.canonicalize()? == ds_root.canonicalize()? { break }; + ancestors.push_front(a); + } + + let mut hashbuf: [u8; 32] = [0; 32]; + let mut prev_key = root_key; + for a in ancestors { + // hash of parent hash + filename + let filename = a.file_name() + .ok_or(RecordError::CantGetFilename)? + .to_str().ok_or(RecordError::FilenameNotUTF8)?; + let mut ctx = Context::new(&SHA256); + ctx.update(&prev_key.0); + ctx.update(&filename.as_bytes()); + hashbuf.clone_from_slice(ctx.finish().as_ref()); + prev_key = Hash256(hashbuf); + } + + walk_and_insert(tx, &p, prev_key)?; // SINGLE-THREADED + } + + insert_file_content_hashes(tx)?; // MULTI-THREADED + + // SINGLE-THREADED + // Extract files from current_files, in insertion order. For each row, if it's not a + // directory compare parent column to see whether we've changed to a new parent, in which + // case we finalize this parent dir by recording its size and hash. Once a directory is + // finalized, we can update its row in current_files. The recorded time of a directory + // should be the max of all children recorded times. + let mut dirstack: Vec<(Hash256, usize, Context, f64)> = Vec::new(); + + // Find latest entries in filedir_version that are not deleted, and that do not appear in + // current_files, but whose parents _do_ appear. Create new versions marking these as deleted. + + // For each entry in current_files, create a new filedir_version. + + // Find all entries in current_files that declare parents that are not None and are not in + // current_files. Each of these parents must be added with a new version. The content_hash must + // be derived from the existing and new file versions, which should be represented in + // filedir_version at this point. Add these in depth-first order. + + // Drop the temporary current_files table + + Ok(()) } diff --git a/src/main.rs b/src/main.rs index 768a567..5d4dddd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,8 @@ use clap::{Parser, Subcommand}; use rusqlite::{Connection, OpenFlags}; +use uuid::{Uuid}; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::process; // Composable provenance tracking for scientific data analysis @@ -9,17 +10,24 @@ use std::process; #[command(author, version, about, long_about = None, arg_required_else_help=true)] struct Cli { #[command(subcommand)] - command: Option, + command: Option, } #[derive(Subcommand)] -enum Commands { +enum Command { + /// Initialize a new dataset + #[command()] + Init { + /// A short descriptive name for the dataset + #[arg(short, long)] + name: String, + /// The top level directory of the dataset (must be a directory) + #[arg(default_value=".")] + dataset_path: PathBuf, + }, /// Record changes to files/directories within a dataset (or create a new dataset) #[command()] Record { - /// If not within an existing dataset, create one - #[arg(short, long)] - initialize: bool, /// A short descriptive message for this recording, i.e. "Re-run with lr=1e-3" #[arg(short, long)] message: String, @@ -35,12 +43,54 @@ enum Commands { }, } +fn init_schema(conn: &mut Connection) -> Uuid { + match nancy::db::init(conn) { + Err(e) => { + log::error!("Encountered error in initializing schema: {:?}", e); + process::exit(1); + } + Ok(dataset_uuid) => { + log::trace!("Init OK"); + log::info!("Dataset UUID is {dataset_uuid}"); + // Run an empty program so that the dataset log reflects when it was + // initialized + nancy::program::with_program( + conn, + "INIT", + "Initialize dataset", + |prog, _tx| { + let _ = prog.perform_task(&[], |task| { + log::debug!("INIT task UUID is {}", task.uuid); + }); + let okres: Result<(), ()> = Ok(()); + okres + }, + ) + .expect("Empty program should not throw error"); + dataset_uuid + } + } +} + +fn do_record(conn: &mut Connection, message: &str, paths: &[PathBuf], dataset_path: &Path) -> () { + if let Err(e) = nancy::program::with_program(conn, "RECORD", message, |prog, tx| { + prog.perform_task(&[], |_task| { + // Note that this may fail, in which case we should roll back only this program + // but keep the dataset initialized. + nancy::fs::record(tx, paths, &dataset_path, message) + }) + }) { + log::error!("Encountered error in RECORD program: {:?}", e); + process::exit(1); + }; +} + fn main() { env_logger::init(); let args = Cli::parse(); match &args.command { - Some(Commands::Hello {}) => { + Some(Command::Hello {}) => { println!("Hello from nancy (binary)!"); match nancy::print_uuid() { Ok(_) => { @@ -51,63 +101,55 @@ fn main() { } }; } - Some(Commands::Record { - initialize, + Some(Command::Init { + name, + dataset_path + }) => { + if !dataset_path.is_dir() { + log::error!("Path {:?} does not point to an existing directory", dataset_path); + process::exit(1); + } + let dbpath = &dataset_path.join("nancy.db"); + if dbpath.exists() { + log::error!("Database {:?} exists, indicating this dataset is already \ + initialized. Refusing to overwrite.", dbpath); + process::exit(1); + } + log::info!("Initializing new database at {:?}", dbpath); + let mut conn = match Connection::open(dbpath) { + Err(e) => { + log::error!( + "Could not open new SQLite database at {dbpath:?}: {:?}", + e + ); + process::exit(1); + } + Ok(cc) => cc, + }; + conn.pragma_update(None, "foreign_keys", &"ON").unwrap(); + let u = init_schema(&mut conn); + do_record( + &mut conn, + &format!("Initial record of dataset {:?}", u), + &[dataset_path.to_path_buf()], + dataset_path, + ); + } + Some(Command::Record { message, record_paths, }) => { // If no paths are given, use ["."] for the following steps. // Determine dataset dir (ds_dir) - let mut conn = match nancy::db::find_dataset_dir(record_paths) { + match nancy::db::find_dataset_dir(record_paths) { Err(nancy::db::FindDatasetError::NoDataset(path)) => { log::info!("No dataset at or above nearest ancestor path: {:?}", path); - if !initialize { - log::error!( - "Refusing to initialize a new dataset at {path:?}. \ - Pass the -i or --initialize flag to request initialization." - ); - process::exit(1); - } - let dbpath = &path.join("nancy.db"); - log::info!("Initializing new database at {:?}", dbpath); - let mut c = match Connection::open(dbpath) { - Err(e) => { - log::error!( - "Could not open new SQLite database at {dbpath:?}: {:?}", - e - ); - process::exit(1); - } - Ok(cc) => cc, - }; - c.pragma_update(None, "foreign_keys", &"ON").unwrap(); - match nancy::db::init(&mut c) { - Err(e) => { - log::error!("Encountered error in initializing schema: {:?}", e); - process::exit(1); - } - Ok(dataset_uuid) => { - log::trace!("Init OK"); - log::info!("Dataset UUID is {dataset_uuid}"); - // Run an empty program so that the dataset log reflects when it was - // initialized - nancy::program::with_program( - &mut c, - "INIT", - "Initialize dataset", - |prog| { - let _ = prog.perform_task(&[], |task| { - log::debug!("INIT task UUID is {}", task.uuid); - }); - let okres: Result<(), ()> = Ok(()); - okres - }, - ) - .expect("Empty program should not throw error"); - } - } - c + log::error!( + "Refusing to initialize a new dataset at {path:?}. \ + Use `nancy init` to create a dataset instead." + ); + process::exit(1); } Err(e) => { log::error!("Could not determine dataset directory: {:?}", e); @@ -118,7 +160,7 @@ fn main() { log::info!("Found existing dataset at path: {:?}", path); let dbpath = &path.join("nancy.db"); // open with flags to prevent creating when we believe the db exists - let mut c = match Connection::open_with_flags( + let mut conn = match Connection::open_with_flags( dbpath, OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX, ) { @@ -130,8 +172,8 @@ fn main() { } Ok(cc) => cc, }; - c.pragma_update(None, "foreign_keys", &"ON").unwrap(); - match nancy::db::ensure_schema(&mut c) { + conn.pragma_update(None, "foreign_keys", &"ON").unwrap(); + match nancy::db::ensure_schema(&mut conn) { Err(e) => { log::error!("Error ensuring schema: {}", e); process::exit(1); @@ -140,23 +182,12 @@ fn main() { log::debug!("Schema ensured: {:?}", update_result); } } - c + do_record(&mut conn, message, record_paths.as_slice(), &path); } }; - if let Err(e) = nancy::program::with_program(&mut conn, "RECORD", message, |prog| { - prog.perform_task(&[], |_task| { - let dataset_path = PathBuf::from("."); - // Note that this may fail, in which case we should roll back only this program - // but keep the dataset initialized. - nancy::fs::record(&dataset_path, message) - }) - }) { - log::error!("Encountered error in RECORD program: {:?}", e); - process::exit(1); - }; } - Some(Commands::Status {}) => { + Some(Command::Status {}) => { println!("status not yet implemented"); } None => {} diff --git a/src/program.rs b/src/program.rs index a3720fa..c6a82f3 100644 --- a/src/program.rs +++ b/src/program.rs @@ -1,7 +1,6 @@ use log; use rusqlite::{Connection, Error as RSQLError, Transaction}; extern crate derive_more; -use derive_more::From; use uuid::Uuid; use std::time::{Instant, SystemTime}; @@ -31,6 +30,7 @@ impl Program { pub fn new(tx: &mut Transaction, name: &str, message: &str) -> Program { let u = Uuid::new_v4(); log::debug!("New {} Program with UUID {}", name, u); + // TODO: Insert this program into the database Program { uuid: u, name: name.to_string(), @@ -67,7 +67,7 @@ impl From for ProgramError { } /// Run a closure as a program, within a database transaction -pub fn with_program Result<(), E>>( +pub fn with_program Result<(), E>>( conn: &mut Connection, name: &str, message: &str, @@ -95,7 +95,14 @@ pub fn with_program Result<(), E>>( log::debug!(target: log_target, "Starting timers"); let start_st = SystemTime::now(); let start = Instant::now(); - let start_stamp = timing::persistent_stamp(Instant::now(), start, start_st); + let start_stamp = timing::persistent_stamp(start, start, start_st); + + log::debug!( + target: log_target, + "Start time: {:?} f64 timestamp={:#?}", + start_st, + start_stamp + ); // Instantiate Program // (record name and message for new program, get program ID) @@ -104,17 +111,11 @@ pub fn with_program Result<(), E>>( log::info!(target: log_target, "Running {:?}", prog); // run closure with program argument - f(prog)?; // if closure fails, transaction will be rolled back + f(prog, &mut tx)?; // if closure fails, transaction will be rolled back // end timer // Commit scope for RECORD program let end = Instant::now(); - log::debug!( - target: log_target, - "Start time: {:?} f64 timestamp={:#?}", - start_st, - start_stamp - ); log::debug!( target: log_target, "Elapsed: {} seconds", @@ -122,6 +123,7 @@ pub fn with_program Result<(), E>>( ); // record end time // commit transaction + log::debug!(target: log_target, "Committing transaction and finalizing program"); match tx.commit() { Err(e) => Err(ProgramError::CommitFailed(e)), Ok(_) => Ok(()),