nancyrs/src/fs.rs

450 lines
15 KiB
Rust

use derive_more::{From};
use jwalk::{Error as JWalkError, WalkDir, WalkDirGeneric};
use log;
use rayon::prelude::*;
use ring::digest::{Context, SHA256};
use rusqlite::{Error as RSError, Result as RSResult, ToSql, Transaction, types as rstypes};
use uuid::{Uuid};
use std::collections::{LinkedList};
use std::fmt;
use std::fs::{File};
use std::io::{BufReader, Error as IOError, Read, Result as IOResult};
use std::path::{Path, PathBuf};
use std::time::{Instant, SystemTime};
use crate::timing::{persistent_stamp};
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum FileType {
Other = 0, // char/block devices, FIFOs, sockets...
Regular = 1,
Directory = 2,
Symlink = 3,
}
impl From<std::fs::FileType> for FileType {
fn from(ft: std::fs::FileType) -> Self {
if ft.is_file() {
FileType::Regular
} else if ft.is_dir() {
FileType::Directory
} else if ft.is_symlink() {
FileType::Symlink
} else {
FileType::Other
}
}
}
impl fmt::Display for FileType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self);
Ok(())
}
}
#[derive(Debug)]
struct FSVersion {
uuid: Uuid,
//content_sha256: [u8; 32],
//filename: String,
//parent: [u8; 32],
}
impl FSVersion {
fn from_file(path: &Path, dataset_uuid: Uuid, dataset_root: &Path) -> FSVersion {
let u = Uuid::new_v4();
//let hasher = Sha256::new();
FSVersion {
uuid: u,
//filename: "",
//content_sha256: hasher.finalize(),
}
}
}
#[derive(Debug, From)]
pub enum RecordError {
SQLError(RSError),
IOError(IOError),
DirectoryWalkError(JWalkError),
UUIDParseError(uuid::Error),
CantGetFilename,
FilenameNotUTF8,
ParentHashNotSet,
TooBigDepthJump,
NotImplemented,
}
#[derive(Clone,Debug)]
struct FileInfo {
parent_sha256: Option<
[u8; 32]
>,
content_hash: [u8; 32],
}
impl Default for FileInfo {
fn default() -> Self {
FileInfo {
parent_sha256: None,
content_hash: [0; 32],
}
}
}
#[derive(Copy,Clone,Debug,From)]
pub struct Hash256([u8; 32]);
impl fmt::LowerHex for Hash256 {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
for byte in self.0 {
write!(f, "{:x}", byte);
}
Ok(())
}
}
impl fmt::UpperHex for Hash256 {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
for byte in self.0 {
write!(f, "{:X}", byte);
}
Ok(())
}
}
impl fmt::Display for Hash256 {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
for byte in self.0 {
write!(f, "{:x}", byte);
}
Ok(())
}
}
impl ToSql for Hash256 {
fn to_sql(&self) -> RSResult<rstypes::ToSqlOutput<'_>> {
Ok(rstypes::ToSqlOutput::from(&self.0[..]))
}
}
fn buffered_hash256<'a, R: Read>(mut reader: R) -> IOResult<(Hash256, usize)> {
let mut ctx = Context::new(&SHA256);
let mut buffer = [0; 1024 * 128];
let mut num_bytes: usize = 0;
loop {
let count = reader.read(&mut buffer)?;
if count == 0 {
break;
}
ctx.update(&buffer[..count]);
num_bytes += count;
}
let mut hash: [u8; 32] = [0; 32];
hash.clone_from_slice(ctx.finish().as_ref());
Ok((hash.into(), num_bytes))
}
/// Walk the directory while sorting by filename, maintaining a stack of hashes (keys) for
/// directories and insert entries into current_files table as we pass over them. This means
/// computing the sha256 key, storing it along with parent, filetype, and symlink_target.
fn walk_and_insert(tx: &Transaction, p: &Path, path_key: Hash256) -> Result<(), RecordError> {
let mut insert_stmt = tx.prepare(
"INSERT OR IGNORE INTO current_files
(sha256, name, filepath, parent, version_uuid, filetype, symlink_target, recorded_time)
VALUES(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)")?;
let mut dirstack: Vec<Option<Hash256>> = vec![];
let mut parent: Option<Hash256> = None;
let mut lastkey: Option<Hash256> = None;
let mut depth: usize = 0;
let start_systemtime = SystemTime::now();
let start_instant = Instant::now();
log::trace!("walk_and_insert {:?}", p);
for entry_result in WalkDir::new(p)
.follow_links(false)
.skip_hidden(true)
.sort(true) {
let entry = entry_result?;
let path = entry.path();
let pathstr = path.to_str()
.expect("Path will be convertable to a UTF-8 string");
let filetype = FileType::from(entry.file_type());
let filename = path.file_name()
.expect("Path will end in a named component (unlike / or foo/..)")
.to_str()
.expect("File name will be convertable to a UTF-8 string");
let symlink_target = if filetype == FileType::Symlink {
Some(
std::fs::read_link(&path)?
.to_str()
.expect("Symlink target will be convertable to a UTF-8 string")
.to_string()
)
} else { None };
if entry.depth > depth {
// descending into directory
if entry.depth != depth + 1 {
// we should never descend two or more steps at a time
return Err(RecordError::TooBigDepthJump);
}
//log::debug!("Down");
dirstack.push(parent);
parent = lastkey;
depth += 1;
}
while depth > entry.depth {
// finished processing subdirectory, going back up the stack
//log::debug!("Up");
parent = dirstack.pop().unwrap();
depth -= 1;
}
let mut ctx = Context::new(&SHA256);
match parent {
Some(Hash256(b)) => {
ctx.update(&b[..]);
}
None => { // Root. hash is the hash of the uuid for this dataset
ctx.update(&path_key.0);
}
}
ctx.update(filename.as_bytes());
let mut thiskey: [u8; 32] = [0; 32];
thiskey.clone_from_slice(ctx.finish().as_ref());
let thiskey = Hash256(thiskey);
let ver_uuid = Uuid::new_v4();
insert_stmt.execute((
thiskey.0,
filename,
pathstr,
parent,
ver_uuid.as_bytes(),
filetype as u8,
symlink_target,
persistent_stamp(Instant::now(), start_instant, start_systemtime),
))?;
lastkey = Some(thiskey);
}
Ok(())
}
/// Compute content hashes for all files in the current_files temp table. Symlinks and "Other"
/// types will have NULL hashes, and directory hashes will be computed in a separate step.
fn insert_file_content_hashes(tx: &Transaction) -> Result<(), RecordError> {
log::trace!("insert_file_content_hashes");
// Extract all regular files (key and path) from current_files, in arbitrary order, and compute
// content hashes for each, then update the corresponding entry in current_files.
let allpaths: Vec<RSResult<(i32, String)>> = tx.prepare(
&format!(
"SELECT id, filepath FROM current_files WHERE filetype == {}",
FileType::Regular as u8)
)?.query_map((), |row| {
let id = row.get::<usize, i32>(0).unwrap();
let filepath = row.get::<usize, String>(1).unwrap();
//println!("{} {}", id, filepath);
Ok((id, filepath))
}).unwrap().collect::<Vec<
RSResult<(i32, String)>>>();
let start_systemtime = SystemTime::now();
let start_instant = Instant::now();
let hashes: Vec<IOResult<(i32, Hash256, usize, Instant)>> = allpaths.par_iter().map(|fpres| {
if let Ok((id, filepath)) = fpres {
let input = File::open(filepath)?;
let reader = BufReader::new(input);
let (chash, size_bytes) = buffered_hash256(reader)?;
Ok((*id, chash, size_bytes, Instant::now()))
} else { // should never happen
Ok((0 as i32, Hash256([0; 32]), 0, Instant::now()))
}
}).collect();
let mut update_stmt = tx.prepare(
"UPDATE current_files
SET
content_sha256 = ?2,
size_bytes = ?3,
recorded_time = ?4
WHERE
id = ?1")?;
for hashresult in hashes {
if let Ok((id, hash, size_bytes, recorded_instant)) = hashresult {
update_stmt.execute((
id,
hash,
size_bytes,
persistent_stamp(recorded_instant, start_instant, start_systemtime),
))?;
}
}
Ok(())
}
/// Find latest entries in filedir_version that are not deleted, and that do not appear in
/// current_files, but whose parents _do_ appear. Create new versions marking these as deleted.
fn find_deleted(tx: &Transaction, p: &Path, path_key: Hash256) -> Result<(), RecordError> {
// Do a CTE from filedir to get key and relpath for all files below p (inclusive)
// For files in filedir but not in current_files, insert entries into temp table deleted_files
// marking them as deleted.
log::warn!("find_deleted not yet implemented");
Ok(())
}
/// Remove rows from deleted_files which were previously deleted in the database
fn prune_deleted(tx: &Transaction) -> Result<(), RecordError> {
// Join filedir_version to deleted_files to get latest deleted status, then drop
log::warn!("prune_deleted not yet implemented");
Ok(())
}
/// Compute directory hashes in the current_files table
fn compute_current_directory_hashes(tx: &Transaction) -> Result<(), RecordError> {
// Extract files with a CTE on current_files, in alphabetical order. For each row, if it's not
// a directory compare parent column to see whether we've changed to a new parent, in which
// case we finalize this parent dir by recording its size and hash. Once a directory is
// finalized, we can update its row in current_files. The recorded time of a directory should
// be the max of all children recorded times.
let mut dirstack: Vec<(Hash256, usize, Context, f64)> = Vec::new();
// Find all entries in current_files that declare parents that are not None and are not in
// current_files. Each of these parents must be added with a new version. The content_hash must
// be derived from the existing and new file versions, which should be represented in
// filedir_version at this point. Add these in depth-first order.
log::warn!("update_current_directories not yet implemented");
Ok(())
}
/// Transfer rows in current_files and deleted_files into new rows in filedir_version
fn persist_temp_tables(tx: &Transaction, task_uuid: Uuid, dataset_uuid: Uuid) -> Result<(), RecordError> {
tx.execute("
INSERT OR IGNORE INTO
filedir
SELECT
sha256, ?, name, parent
FROM current_files
",
(dataset_uuid.as_bytes(),),
)?;
tx.execute("
INSERT INTO
filedir_version
SELECT
version_uuid, sha256, recorded_time, filetype, FALSE, symlink_target, content_sha256, ?
FROM current_files
",
(task_uuid.as_bytes(),),
)?;
Ok(())
}
pub fn record(
tx: &Transaction,
paths: &[PathBuf],
ds_root: &Path,
message: &str,
task_uuid: Uuid,
) -> Result<(), RecordError> {
log::info!(
"Recording path {:?} for dataset at {:?} with user-provided message \"{}\"",
paths,
ds_root,
message,
);
// get local dataset UUID
let u = Uuid::parse_str(
tx.prepare("SELECT value FROM local_metadata WHERE key = 'dataset_uuid'")?
.query_row([], |row| {
let uuidstr: Result<String, RSError> = row.get(0);
uuidstr
})?.as_str()
)?;
log::debug!("Found dataset UUID: {}", u);
// compute dataset root key. This is the SHA256 of the UUID
let mut ctx = Context::new(&SHA256);
ctx.update(u.as_bytes());
let mut root_key: [u8; 32] = [0; 32];
root_key.clone_from_slice(ctx.finish().as_ref());
let root_key = Hash256(root_key);
log::info!("Root key is {}", root_key);
// This schema is like filedir joined with filedir_version
// TODO: revert this to a TEMP TABLE after it's debugged
tx.execute("CREATE TEMP TABLE current_files (
id INTEGER PRIMARY KEY NOT NULL, -- only used in this table
sha256 BLOB NOT NULL, -- will become the primary key on filedir
name TEXT NOT NULL, -- filename without path
filepath TEXT NOT NULL, -- local path to file
parent BLOB, -- hash referencing either filedir or this table
version_uuid BLOB NOT NULL, -- generated UUID v4 for this version
recorded_time REAL, -- float64 representation of unix timestamp
filetype INTEGER, -- corresponds to the FileType enum
-- deleted BOOL NOT NULL, -- deleted is always FALSE for this table
symlink_target TEXT,
size_bytes INTEGER, -- null for anything but files/dirs. For dirs, the sum of children
content_sha256 BLOB,
UNIQUE(sha256) -- prevent inserting same file twice
)", [])?;
for p in paths {
let p = p.canonicalize()?;
let mut ctx = Context::new(&SHA256);
ctx.update(u.as_bytes());
// manually compute the SHA256 key for this path, starting with the SHA256 of the dataset
// UUID to indicate the root directory, then taking the SHA256 of the parent plus filename,
// as we descend into subdirectories until we reach p.
let mut ancestors = LinkedList::new();
for a in p.ancestors() { // fill a stack of ancestor dirs
if a.canonicalize()? == ds_root.canonicalize()? { break };
ancestors.push_front(a);
}
let mut hashbuf: [u8; 32] = [0; 32];
let mut prev_key = root_key;
for a in ancestors {
// hash of parent hash + filename
let filename = a.file_name()
.ok_or(RecordError::CantGetFilename)?
.to_str().ok_or(RecordError::FilenameNotUTF8)?;
let mut ctx = Context::new(&SHA256);
ctx.update(&prev_key.0);
ctx.update(&filename.as_bytes());
hashbuf.clone_from_slice(ctx.finish().as_ref());
prev_key = Hash256(hashbuf);
}
walk_and_insert(tx, &p, prev_key)?;
// Find deleted files ONLY under the paths we are given
find_deleted(tx, &p, prev_key)?;
}
// MULTI-THREADED hash computation of all found files
insert_file_content_hashes(tx)?;
// after we've inserted all paths, prune deleted files to prevent double-recording deletions
prune_deleted(tx)?;
compute_current_directory_hashes(tx)?;
persist_temp_tables(tx, task_uuid, u)?;
// Drop the temporary current_files table
log::debug!("Dropping temp table current_files");
tx.execute("DROP TABLE current_files", ())?;
Ok(())
}