use derive_more::{From}; use jwalk::{Error as JWalkError, WalkDir, WalkDirGeneric}; use log; use rayon::prelude::*; use ring::digest::{Context, SHA256}; use rusqlite::{Error as RSError, Result as RSResult, ToSql, Transaction, types as rstypes}; use uuid::{Uuid}; use std::collections::{LinkedList}; use std::fmt; use std::fs::{File}; use std::io::{BufReader, Error as IOError, Read, Result as IOResult}; use std::path::{Path, PathBuf}; use std::time::{Instant, SystemTime}; use crate::timing::{persistent_stamp}; #[derive(Copy, Clone, Debug, PartialEq)] pub enum FileType { Other = 0, // char/block devices, FIFOs, sockets... Regular = 1, Directory = 2, Symlink = 3, } impl From for FileType { fn from(ft: std::fs::FileType) -> Self { if ft.is_file() { FileType::Regular } else if ft.is_dir() { FileType::Directory } else if ft.is_symlink() { FileType::Symlink } else { FileType::Other } } } impl fmt::Display for FileType { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{:?}", self); Ok(()) } } #[derive(Debug)] struct FSVersion { uuid: Uuid, //content_sha256: [u8; 32], //filename: String, //parent: [u8; 32], } impl FSVersion { fn from_file(path: &Path, dataset_uuid: Uuid, dataset_root: &Path) -> FSVersion { let u = Uuid::new_v4(); //let hasher = Sha256::new(); FSVersion { uuid: u, //filename: "", //content_sha256: hasher.finalize(), } } } #[derive(Debug, From)] pub enum RecordError { SQLError(RSError), IOError(IOError), DirectoryWalkError(JWalkError), UUIDParseError(uuid::Error), CantGetFilename, FilenameNotUTF8, ParentHashNotSet, TooBigDepthJump, NotImplemented, } #[derive(Clone,Debug)] struct FileInfo { parent_sha256: Option< [u8; 32] >, content_hash: [u8; 32], } impl Default for FileInfo { fn default() -> Self { FileInfo { parent_sha256: None, content_hash: [0; 32], } } } #[derive(Copy,Clone,Debug,From)] pub struct Hash256([u8; 32]); impl fmt::LowerHex for Hash256 { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { for byte in self.0 { write!(f, "{:x}", byte); } Ok(()) } } impl fmt::UpperHex for Hash256 { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { for byte in self.0 { write!(f, "{:X}", byte); } Ok(()) } } impl fmt::Display for Hash256 { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { for byte in self.0 { write!(f, "{:x}", byte); } Ok(()) } } impl ToSql for Hash256 { fn to_sql(&self) -> RSResult> { Ok(rstypes::ToSqlOutput::from(&self.0[..])) } } fn buffered_hash256<'a, R: Read>(mut reader: R) -> IOResult<(Hash256, usize)> { let mut ctx = Context::new(&SHA256); let mut buffer = [0; 1024 * 128]; let mut num_bytes: usize = 0; loop { let count = reader.read(&mut buffer)?; if count == 0 { break; } ctx.update(&buffer[..count]); num_bytes += count; } let mut hash: [u8; 32] = [0; 32]; hash.clone_from_slice(ctx.finish().as_ref()); Ok((hash.into(), num_bytes)) } /// Walk the directory while sorting by filename, maintaining a stack of hashes (keys) for /// directories and insert entries into current_files table as we pass over them. This means /// computing the sha256 key, storing it along with parent, filetype, and symlink_target. fn walk_and_insert(tx: &mut Transaction, p: &Path, path_key: Hash256) -> Result<(), RecordError> { let mut insert_stmt = tx.prepare( "INSERT OR IGNORE INTO current_files (sha256, name, filepath, parent, version_uuid, filetype, symlink_target, recorded_time) VALUES(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)")?; let mut dirstack: Vec> = vec![]; let mut parent: Option = None; let mut lastkey: Option = None; let mut depth: usize = 0; let start_systemtime = SystemTime::now(); let start_instant = Instant::now(); log::trace!("walk_and_insert {:?}", p); for entry_result in WalkDir::new(p) .follow_links(false) .skip_hidden(true) .sort(true) { let entry = entry_result?; let path = entry.path(); let pathstr = path.to_str() .expect("Path will be convertable to a UTF-8 string"); let filetype = FileType::from(entry.file_type()); let filename = path.file_name() .expect("Path will end in a named component (unlike / or foo/..)") .to_str() .expect("File name will be convertable to a UTF-8 string"); let symlink_target = if filetype == FileType::Symlink { Some( std::fs::read_link(&path)? .to_str() .expect("Symlink target will be convertable to a UTF-8 string") .to_string() ) } else { None }; if entry.depth > depth { // descending into directory if entry.depth != depth + 1 { // we should never descend two or more steps at a time return Err(RecordError::TooBigDepthJump); } //log::debug!("Down"); dirstack.push(parent); parent = lastkey; depth += 1; } while depth > entry.depth { // finished processing subdirectory, going back up the stack //log::debug!("Up"); parent = dirstack.pop().unwrap(); depth -= 1; } let mut ctx = Context::new(&SHA256); match parent { Some(Hash256(b)) => { ctx.update(&b[..]); } None => { // Root. hash is the hash of the uuid for this dataset ctx.update(&path_key.0); } } ctx.update(filename.as_bytes()); let mut thiskey: [u8; 32] = [0; 32]; thiskey.clone_from_slice(ctx.finish().as_ref()); let thiskey = Hash256(thiskey); let ver_uuid = Uuid::new_v4(); insert_stmt.execute(( thiskey.0, filename, pathstr, parent, ver_uuid.as_bytes(), filetype as u8, symlink_target, persistent_stamp(Instant::now(), start_instant, start_systemtime), ))?; lastkey = Some(thiskey); } Ok(()) } /// Compute content hashes for all files in the current_files temp table. Symlinks and "Other" /// types will have NULL hashes, and directory hashes will be computed in a separate step. fn insert_file_content_hashes(tx: &mut Transaction) -> Result<(), RecordError> { log::trace!("insert_file_content_hashes"); // Extract all regular files (key and path) from current_files, in arbitrary order, and compute // content hashes for each, then update the corresponding entry in current_files. let allpaths: Vec> = tx.prepare( &format!( "SELECT id, filepath FROM current_files WHERE filetype == {}", FileType::Regular as u8) )?.query_map((), |row| { let id = row.get::(0).unwrap(); let filepath = row.get::(1).unwrap(); //println!("{} {}", id, filepath); Ok((id, filepath)) }).unwrap().collect::>>(); let start_systemtime = SystemTime::now(); let start_instant = Instant::now(); let hashes: Vec> = allpaths.par_iter().map(|fpres| { if let Ok((id, filepath)) = fpres { let input = File::open(filepath)?; let reader = BufReader::new(input); let (chash, size_bytes) = buffered_hash256(reader)?; Ok((*id, chash, size_bytes, Instant::now())) } else { // should never happen Ok((0 as i32, Hash256([0; 32]), 0, Instant::now())) } }).collect(); let mut update_stmt = tx.prepare( "UPDATE current_files SET content_sha256 = ?2, size_bytes = ?3, recorded_time = ?4 WHERE id = ?1")?; for hashresult in hashes { if let Ok((id, hash, size_bytes, recorded_instant)) = hashresult { update_stmt.execute(( id, hash, size_bytes, persistent_stamp(recorded_instant, start_instant, start_systemtime), ))?; } } Ok(()) } pub fn record( tx: &mut Transaction, paths: &[PathBuf], ds_root: &Path, message: &str, //task_uuid: Uuid, ) -> Result<(), RecordError> { log::info!( "Recording path {:?} for dataset at {:?} with user-provided message \"{}\"", paths, ds_root, message, ); // get local dataset UUID let u = Uuid::parse_str( tx.prepare("SELECT value FROM local_metadata WHERE key = 'dataset_uuid'")? .query_row([], |row| { let uuidstr: Result = row.get(0); uuidstr })?.as_str() )?; log::debug!("Found dataset UUID: {}", u); // compute dataset root key. This is the SHA256 of the UUID let mut ctx = Context::new(&SHA256); ctx.update(u.as_bytes()); let mut root_key: [u8; 32] = [0; 32]; root_key.clone_from_slice(ctx.finish().as_ref()); let root_key = Hash256(root_key); log::info!("Root key is {}", root_key); // This schema is like filedir joined with filedir_version // TODO: revert this to a TEMP TABLE after it's debugged tx.execute("CREATE -- TEMP TABLE current_files ( id INTEGER PRIMARY KEY NOT NULL, -- only used in this table sha256 BLOB NOT NULL, -- will become the primary key on filedir name TEXT NOT NULL, -- filename without path filepath TEXT NOT NULL, -- local path to file parent BLOB, -- hash referencing either filedir or this table version_uuid BLOB NOT NULL, -- generated UUID v4 for this version recorded_time REAL, -- float64 representation of unix timestamp filetype INTEGER, -- corresponds to the FileType enum -- deleted BOOL NOT NULL, -- deleted is always FALSE for this table symlink_target TEXT, size_bytes INTEGER, -- null for anything but files/dirs. For dirs, the sum of children content_sha256 BLOB, UNIQUE(sha256) -- prevent inserting same file twice )", [])?; for p in paths { let p = p.canonicalize()?; let mut ctx = Context::new(&SHA256); ctx.update(u.as_bytes()); // manually compute the SHA256 key for this path, starting with the SHA256 of the dataset // UUID to indicate the root directory, then taking the SHA256 of the parent plus filename, // as we descend into subdirectories until we reach p. let mut ancestors = LinkedList::new(); for a in p.ancestors() { // fill a stack of ancestor dirs if a.canonicalize()? == ds_root.canonicalize()? { break }; ancestors.push_front(a); } let mut hashbuf: [u8; 32] = [0; 32]; let mut prev_key = root_key; for a in ancestors { // hash of parent hash + filename let filename = a.file_name() .ok_or(RecordError::CantGetFilename)? .to_str().ok_or(RecordError::FilenameNotUTF8)?; let mut ctx = Context::new(&SHA256); ctx.update(&prev_key.0); ctx.update(&filename.as_bytes()); hashbuf.clone_from_slice(ctx.finish().as_ref()); prev_key = Hash256(hashbuf); } walk_and_insert(tx, &p, prev_key)?; // SINGLE-THREADED } insert_file_content_hashes(tx)?; // MULTI-THREADED // SINGLE-THREADED // Extract files from current_files, in insertion order. For each row, if it's not a // directory compare parent column to see whether we've changed to a new parent, in which // case we finalize this parent dir by recording its size and hash. Once a directory is // finalized, we can update its row in current_files. The recorded time of a directory // should be the max of all children recorded times. let mut dirstack: Vec<(Hash256, usize, Context, f64)> = Vec::new(); // Find latest entries in filedir_version that are not deleted, and that do not appear in // current_files, but whose parents _do_ appear. Create new versions marking these as deleted. // For each entry in current_files, create a new filedir_version. // Find all entries in current_files that declare parents that are not None and are not in // current_files. Each of these parents must be added with a new version. The content_hash must // be derived from the existing and new file versions, which should be represented in // filedir_version at this point. Add these in depth-first order. // Drop the temporary current_files table Ok(()) }