Compare commits

..

No commits in common. "bffef78291d1cd55fe45a18389a82398733a9fbd" and "12b669d59191f4a30c0d4af1975ad7026a3f5187" have entirely different histories.

6 changed files with 125 additions and 818 deletions

304
Cargo.lock generated
View File

@ -22,18 +22,6 @@ dependencies = [
"memchr",
]
[[package]]
name = "arrayref"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4c527152e37cf757a3f78aae5a06fbeefdb07ccc535c980a3208ee3060dd544"
[[package]]
name = "arrayvec"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
[[package]]
name = "atty"
version = "0.2.14"
@ -45,32 +33,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "autocfg"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "blake3"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a08e53fc5a564bb15bfe6fae56bd71522205f1f91893f9c0116edad6496c183f"
dependencies = [
"arrayref",
"arrayvec",
"cc",
"cfg-if",
"constant_time_eq",
"digest",
]
[[package]]
name = "block-buffer"
version = "0.10.3"
@ -80,18 +48,6 @@ dependencies = [
"generic-array",
]
[[package]]
name = "bumpalo"
version = "3.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba"
[[package]]
name = "cc"
version = "1.0.74"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "581f5dba903aac52ea3feb5ec4810848460ee833876f1f9b0fdeab1f19091574"
[[package]]
name = "cfg-if"
version = "1.0.0"
@ -135,12 +91,6 @@ dependencies = [
"os_str_bytes",
]
[[package]]
name = "constant_time_eq"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc"
[[package]]
name = "convert_case"
version = "0.4.0"
@ -148,70 +98,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e"
[[package]]
name = "crossbeam"
version = "0.8.2"
name = "cpufeatures"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c"
checksum = "28d997bd5e24a5928dd43e46dc529867e207907fe0b239c3477d924f7f2ca320"
dependencies = [
"cfg-if",
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "715e8152b692bba2d374b53d4875445368fdf21a94751410af607a5ac677d1fc"
dependencies = [
"cfg-if",
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f916dfc5d356b0ed9dae65f1db9fc9770aa2851d2662b988ccf4fe3516e86348"
dependencies = [
"autocfg",
"cfg-if",
"crossbeam-utils",
"memoffset",
"scopeguard",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cd42583b04998a5363558e5f9291ee5a5ff6b49944332103f251e7479a82aa7"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edbafec5fa1f196ca66527c1b12c2ec4745ca14b50f1ad8f9f6f720b55d11fac"
dependencies = [
"cfg-if",
"libc",
]
[[package]]
@ -245,15 +137,8 @@ checksum = "adfbc57365a37acbd2ebf2b64d7e69bb766e2fea813521ed536f5d0520dcf86c"
dependencies = [
"block-buffer",
"crypto-common",
"subtle",
]
[[package]]
name = "either"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797"
[[package]]
name = "env_logger"
version = "0.9.1"
@ -339,25 +224,6 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "js-sys"
version = "0.3.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49409df3e3bf0856b916e2ceaca09ee28e6871cf7d9ce97a692cacfdb2a25a47"
dependencies = [
"wasm-bindgen",
]
[[package]]
name = "jwalk"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "172752e853a067cbce46427de8470ddf308af7fd8ceaf9b682ef31a5021b6bb9"
dependencies = [
"crossbeam",
"rayon",
]
[[package]]
name = "libc"
version = "0.2.135"
@ -389,43 +255,21 @@ version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
name = "memoffset"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce"
dependencies = [
"autocfg",
]
[[package]]
name = "nancy"
version = "0.1.0"
dependencies = [
"blake3",
"clap",
"derive_more",
"env_logger",
"jwalk",
"log",
"once_cell",
"rayon",
"ring",
"rusqlite",
"rusqlite_migration",
"sha2",
"uuid",
]
[[package]]
name = "num_cpus"
version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "once_cell"
version = "1.15.0"
@ -486,30 +330,6 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "rayon"
version = "1.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd99e5772ead8baa5215278c9b15bf92087709e9c1b2d1f97cdb5a183c933a7d"
dependencies = [
"autocfg",
"crossbeam-deque",
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "258bcdb5ac6dad48491bb2992db6b7cf74878b0384908af124823d118c99683f"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-utils",
"num_cpus",
]
[[package]]
name = "regex"
version = "1.6.0"
@ -527,21 +347,6 @@ version = "0.6.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244"
[[package]]
name = "ring"
version = "0.16.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc"
dependencies = [
"cc",
"libc",
"once_cell",
"spin",
"untrusted",
"web-sys",
"winapi",
]
[[package]]
name = "rusqlite"
version = "0.28.0"
@ -576,42 +381,35 @@ dependencies = [
"semver",
]
[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "semver"
version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e25dfac463d778e353db5be2449d1cce89bd6fd23c9f1ea21310ce6e5a1b29c4"
[[package]]
name = "sha2"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
]
[[package]]
name = "smallvec"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
[[package]]
name = "spin"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "strsim"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "subtle"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]]
name = "syn"
version = "1.0.102"
@ -644,12 +442,6 @@ version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3"
[[package]]
name = "untrusted"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "uuid"
version = "1.2.1"
@ -677,70 +469,6 @@ version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm-bindgen"
version = "0.2.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eaf9f5aceeec8be17c128b2e93e031fb8a4d469bb9c4ae2d7dc1888b26887268"
dependencies = [
"cfg-if",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c8ffb332579b0557b52d268b91feab8df3615f265d5270fec2a8c95b17c1142"
dependencies = [
"bumpalo",
"log",
"once_cell",
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "052be0f94026e6cbc75cdefc9bae13fd6052cdcaf532fa6c45e7ae33a1e6c810"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07bc0c051dc5f23e307b13285f9d75df86bfdf816c5721e573dec1f9b8aa193c"
dependencies = [
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c38c045535d93ec4f0b4defec448e4291638ee608530863b1e2ba115d4fff7f"
[[package]]
name = "web-sys"
version = "0.3.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bcda906d8be16e728fd5adc5b729afad4e444e106ab28cd1c7256e54fa61510f"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "winapi"
version = "0.3.9"

View File

@ -18,15 +18,12 @@ name = "nancy"
path = "src/main.rs"
[dependencies]
blake3 = "1.3.1"
clap = { version = "4.0.14", features = ["derive"] }
derive_more = "0.99.17"
env_logger = "0.9.1"
jwalk = "0.6.0"
log = "0.4.17"
once_cell = "1.15.0"
rayon = "1.5.3"
ring = "0.16.20"
rusqlite = { version = "0.28.0", features = ["uuid"] }
rusqlite_migration = "1.0.0"
sha2 = "0.10.6"
uuid = { version = "1.2.1", features = ["v4"] }

405
src/fs.rs
View File

@ -1,407 +1,18 @@
use derive_more::{From};
use jwalk::{Error as JWalkError, WalkDir, WalkDirGeneric};
//use jwalk;
use log;
use rayon::prelude::*;
use ring::digest::{Context, SHA256};
use rusqlite::{Error as RSError, Result as RSResult, ToSql, Transaction, types as rstypes};
use uuid::{Uuid};
use std::collections::{LinkedList};
use std::fmt;
use std::fs::{File};
use std::io::{BufReader, Error as IOError, Read, Result as IOResult};
use std::path::{Path, PathBuf};
use std::time::{Instant, SystemTime};
use crate::timing::{persistent_stamp};
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum FileType {
Other = 0, // char/block devices, FIFOs, sockets...
Regular = 1,
Directory = 2,
Symlink = 3,
}
impl From<std::fs::FileType> for FileType {
fn from(ft: std::fs::FileType) -> Self {
if ft.is_file() {
FileType::Regular
} else if ft.is_dir() {
FileType::Directory
} else if ft.is_symlink() {
FileType::Symlink
} else {
FileType::Other
}
}
}
impl fmt::Display for FileType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self);
Ok(())
}
}
//use sha2::{digest::FixedOutput, Digest, Sha256};
use std::path::Path;
#[derive(Debug)]
struct FSVersion {
uuid: Uuid,
//content_sha256: [u8; 32],
//filename: String,
//parent: [u8; 32],
}
impl FSVersion {
fn from_file(path: &Path, dataset_uuid: Uuid, dataset_root: &Path) -> FSVersion {
let u = Uuid::new_v4();
//let hasher = Sha256::new();
FSVersion {
uuid: u,
//filename: "",
//content_sha256: hasher.finalize(),
}
}
}
#[derive(Debug, From)]
pub enum RecordError {
SQLError(RSError),
IOError(IOError),
DirectoryWalkError(JWalkError),
UUIDParseError(uuid::Error),
CantGetFilename,
FilenameNotUTF8,
ParentHashNotSet,
TooBigDepthJump,
NotImplemented,
}
#[derive(Clone,Debug)]
struct FileInfo {
parent_sha256: Option<
[u8; 32]
>,
content_hash: [u8; 32],
}
impl Default for FileInfo {
fn default() -> Self {
FileInfo {
parent_sha256: None,
content_hash: [0; 32],
}
}
}
#[derive(Copy,Clone,Debug,From)]
pub struct Hash256([u8; 32]);
impl fmt::LowerHex for Hash256 {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
for byte in self.0 {
write!(f, "{:x}", byte);
}
Ok(())
}
}
impl fmt::UpperHex for Hash256 {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
for byte in self.0 {
write!(f, "{:X}", byte);
}
Ok(())
}
}
impl fmt::Display for Hash256 {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
for byte in self.0 {
write!(f, "{:x}", byte);
}
Ok(())
}
}
impl ToSql for Hash256 {
fn to_sql(&self) -> RSResult<rstypes::ToSqlOutput<'_>> {
Ok(rstypes::ToSqlOutput::from(&self.0[..]))
}
}
fn buffered_hash256<'a, R: Read>(mut reader: R) -> IOResult<(Hash256, usize)> {
let mut ctx = Context::new(&SHA256);
let mut buffer = [0; 1024 * 128];
let mut num_bytes: usize = 0;
loop {
let count = reader.read(&mut buffer)?;
if count == 0 {
break;
}
ctx.update(&buffer[..count]);
num_bytes += count;
}
let mut hash: [u8; 32] = [0; 32];
hash.clone_from_slice(ctx.finish().as_ref());
Ok((hash.into(), num_bytes))
}
/// Walk the directory while sorting by filename, maintaining a stack of hashes (keys) for
/// directories and insert entries into current_files table as we pass over them. This means
/// computing the sha256 key, storing it along with parent, filetype, and symlink_target.
fn walk_and_insert(tx: &mut Transaction, p: &Path, path_key: Hash256) -> Result<(), RecordError> {
let mut insert_stmt = tx.prepare(
"INSERT OR IGNORE INTO current_files
(sha256, name, filepath, parent, version_uuid, filetype, symlink_target, recorded_time)
VALUES(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)")?;
let mut dirstack: Vec<Option<Hash256>> = vec![];
let mut parent: Option<Hash256> = None;
let mut lastkey: Option<Hash256> = None;
let mut depth: usize = 0;
let start_systemtime = SystemTime::now();
let start_instant = Instant::now();
log::trace!("walk_and_insert {:?}", p);
for entry_result in WalkDir::new(p)
.follow_links(false)
.skip_hidden(true)
.sort(true) {
let entry = entry_result?;
let path = entry.path();
let pathstr = path.to_str()
.expect("Path will be convertable to a UTF-8 string");
let filetype = FileType::from(entry.file_type());
let filename = path.file_name()
.expect("Path will end in a named component (unlike / or foo/..)")
.to_str()
.expect("File name will be convertable to a UTF-8 string");
log::debug!("{:?} : finding target", path);
let symlink_target = if filetype == FileType::Symlink {
Some(
std::fs::read_link(&path)?
.to_str()
.expect("Symlink target will be convertable to a UTF-8 string")
.to_string()
)
} else { None };
log::debug!("{:?} : target = {:?}", path, symlink_target);
if entry.depth > depth {
// descending into directory
if entry.depth != depth + 1 {
// we should never descend two or more steps at a time
return Err(RecordError::TooBigDepthJump);
}
//log::debug!("Down");
dirstack.push(parent);
parent = lastkey;
depth += 1;
}
while depth > entry.depth {
// finished processing subdirectory, going back up the stack
//log::debug!("Up");
parent = dirstack.pop().unwrap();
depth -= 1;
log::debug!(
" parent={} ed={} d={}",
&(match parent {
Some(h) => format!("{}", h),
None => "None".to_string(),
})[..8],
entry.depth,
depth,
);
}
let mut ctx = Context::new(&SHA256);
match parent {
Some(Hash256(b)) => {
ctx.update(&b[..]);
}
None => { // Root. hash is the hash of the uuid for this dataset
ctx.update(&path_key.0);
}
}
ctx.update(filename.as_bytes());
let mut thiskey: [u8; 32] = [0; 32];
thiskey.clone_from_slice(ctx.finish().as_ref());
let thiskey = Hash256(thiskey);
let ver_uuid = Uuid::new_v4();
insert_stmt.execute((
thiskey.0,
filename,
pathstr,
parent,
ver_uuid.as_bytes(),
filetype as u8,
symlink_target,
persistent_stamp(Instant::now(), start_instant, start_systemtime),
))?;
lastkey = Some(thiskey);
}
Ok(())
}
/// Compute content hashes for all files in the current_files temp table. Symlinks and "Other"
/// types will have NULL hashes, and directory hashes will be computed in a separate step.
fn insert_file_content_hashes(tx: &mut Transaction) -> Result<(), RecordError> {
log::trace!("insert_file_content_hashes");
// Extract all regular files (key and path) from current_files, in arbitrary order, and compute
// content hashes for each, then update the corresponding entry in current_files.
let allpaths: Vec<RSResult<(i32, String)>> = tx.prepare(
&format!(
"SELECT id, filepath FROM current_files WHERE filetype == {}",
FileType::Regular as u8)
)?.query_map((), |row| {
let id = row.get::<usize, i32>(0).unwrap();
let filepath = row.get::<usize, String>(1).unwrap();
//println!("{} {}", id, filepath);
Ok((id, filepath))
}).unwrap().collect::<Vec<
RSResult<(i32, String)>>>();
let start_systemtime = SystemTime::now();
let start_instant = Instant::now();
let hashes: Vec<IOResult<(i32, Hash256, usize, Instant)>> = allpaths.par_iter().map(|fpres| {
if let Ok((id, filepath)) = fpres {
let input = File::open(filepath)?;
let mut reader = BufReader::new(input);
let (chash, size_bytes) = buffered_hash256(reader)?;
Ok((*id, chash, size_bytes, Instant::now()))
} else { // should never happen
Ok((0 as i32, Hash256([0; 32]), 0, Instant::now()))
}
}).collect();
let mut update_stmt = tx.prepare(
"UPDATE current_files
SET
content_sha256 = ?2,
size_bytes = ?3,
recorded_time = ?4
WHERE
id = ?1")?;
for hashresult in hashes {
if let Ok((id, hash, size_bytes, recorded_instant)) = hashresult {
update_stmt.execute((
id,
hash,
size_bytes,
persistent_stamp(recorded_instant, start_instant, start_systemtime),
))?;
}
}
Ok(())
}
pub fn record(
tx: &mut Transaction,
paths: &[PathBuf],
ds_root: &Path,
message: &str,
//task_uuid: Uuid,
) -> Result<(), RecordError> {
pub fn record(path: &Path, message: &String) -> Result<(), RecordError> {
log::info!(
"Recording path {:?} for dataset at {:?} with user-provided message \"{}\"",
paths,
ds_root,
message,
"Recording path {:?} with user-provided message \"{}\"",
path,
message
);
// get local dataset UUID
let u = Uuid::parse_str(
tx.prepare("SELECT value FROM local_metadata WHERE key = 'dataset_uuid'")?
.query_row([], |row| {
let uuidstr: Result<String, RSError> = row.get(0);
uuidstr
})?.as_str()
)?;
log::debug!("Found dataset UUID: {}", u);
// compute dataset root key. This is the SHA256 of the UUID
let mut ctx = Context::new(&SHA256);
ctx.update(u.as_bytes());
let mut root_key: [u8; 32] = [0; 32];
root_key.clone_from_slice(ctx.finish().as_ref());
let root_key = Hash256(root_key);
log::info!("Root key is {}", root_key);
// This schema is like filedir joined with filedir_version
// TODO: revert this to a TEMP TABLE after it's debugged
tx.execute("CREATE
-- TEMP
TABLE current_files (
id INTEGER PRIMARY KEY NOT NULL, -- only used in this table
sha256 BLOB NOT NULL, -- will become the primary key on filedir
name TEXT NOT NULL, -- filename without path
filepath TEXT NOT NULL, -- local path to file
parent BLOB, -- hash referencing either filedir or this table
version_uuid BLOB NOT NULL, -- generated UUID v4 for this version
recorded_time REAL, -- float64 representation of unix timestamp
filetype INTEGER, -- corresponds to the FileType enum
-- deleted BOOL NOT NULL, -- deleted is always FALSE for this table
symlink_target TEXT,
size_bytes INTEGER, -- null for anything but files/dirs. For dirs, the sum of children
content_sha256 BLOB,
UNIQUE(sha256) -- prevent inserting same file twice
)", [])?;
for p in paths {
let p = p.canonicalize()?;
let mut ctx = Context::new(&SHA256);
ctx.update(u.as_bytes());
// manually compute the SHA256 key for this path, starting with the SHA256 of the dataset
// UUID to indicate the root directory, then taking the SHA256 of the parent plus filename,
// as we descend into subdirectories until we reach p.
let mut ancestors = LinkedList::new();
for a in p.ancestors() { // fill a stack of ancestor dirs
if a.canonicalize()? == ds_root.canonicalize()? { break };
ancestors.push_front(a);
}
let mut hashbuf: [u8; 32] = [0; 32];
let mut prev_key = root_key;
for a in ancestors {
// hash of parent hash + filename
let filename = a.file_name()
.ok_or(RecordError::CantGetFilename)?
.to_str().ok_or(RecordError::FilenameNotUTF8)?;
let mut ctx = Context::new(&SHA256);
ctx.update(&prev_key.0);
ctx.update(&filename.as_bytes());
hashbuf.clone_from_slice(ctx.finish().as_ref());
prev_key = Hash256(hashbuf);
}
walk_and_insert(tx, &p, prev_key)?; // SINGLE-THREADED
}
insert_file_content_hashes(tx)?; // MULTI-THREADED
// SINGLE-THREADED
// Extract files from current_files, in insertion order. For each row, if it's not a
// directory compare parent column to see whether we've changed to a new parent, in which
// case we finalize this parent dir by recording its size and hash. Once a directory is
// finalized, we can update its row in current_files. The recorded time of a directory
// should be the max of all children recorded times.
let mut dirstack: Vec<(Hash256, usize, Context, f64)> = Vec::new();
// Find latest entries in filedir_version that are not deleted, and that do not appear in
// current_files, but whose parents _do_ appear. Create new versions marking these as deleted.
// For each entry in current_files, create a new filedir_version.
// Find all entries in current_files that declare parents that are not None and are not in
// current_files. Each of these parents must be added with a new version. The content_hash must
// be derived from the existing and new file versions, which should be represented in
// filedir_version at this point. Add these in depth-first order.
// Drop the temporary current_files table
Ok(())
Err(RecordError::NotImplemented)
}

View File

@ -1,8 +1,7 @@
use clap::{Parser, Subcommand};
use rusqlite::{Connection, OpenFlags};
use uuid::{Uuid};
use std::path::{Path, PathBuf};
use std::path::PathBuf;
use std::process;
// Composable provenance tracking for scientific data analysis
@ -10,24 +9,17 @@ use std::process;
#[command(author, version, about, long_about = None, arg_required_else_help=true)]
struct Cli {
#[command(subcommand)]
command: Option<Command>,
command: Option<Commands>,
}
#[derive(Subcommand)]
enum Command {
/// Initialize a new dataset
#[command()]
Init {
/// A short descriptive name for the dataset
#[arg(short, long)]
name: String,
/// The top level directory of the dataset (must be a directory)
#[arg(default_value=".")]
dataset_path: PathBuf,
},
enum Commands {
/// Record changes to files/directories within a dataset (or create a new dataset)
#[command()]
Record {
/// If not within an existing dataset, create one
#[arg(short, long)]
initialize: bool,
/// A short descriptive message for this recording, i.e. "Re-run with lr=1e-3"
#[arg(short, long)]
message: String,
@ -43,54 +35,12 @@ enum Command {
},
}
fn init_schema(conn: &mut Connection) -> Uuid {
match nancy::db::init(conn) {
Err(e) => {
log::error!("Encountered error in initializing schema: {:?}", e);
process::exit(1);
}
Ok(dataset_uuid) => {
log::trace!("Init OK");
log::info!("Dataset UUID is {dataset_uuid}");
// Run an empty program so that the dataset log reflects when it was
// initialized
nancy::program::with_program(
conn,
"INIT",
"Initialize dataset",
|prog, _tx| {
let _ = prog.perform_task(&[], |task| {
log::debug!("INIT task UUID is {}", task.uuid);
});
let okres: Result<(), ()> = Ok(());
okres
},
)
.expect("Empty program should not throw error");
dataset_uuid
}
}
}
fn do_record(conn: &mut Connection, message: &str, paths: &[PathBuf], dataset_path: &Path) -> () {
if let Err(e) = nancy::program::with_program(conn, "RECORD", message, |prog, tx| {
prog.perform_task(&[], |_task| {
// Note that this may fail, in which case we should roll back only this program
// but keep the dataset initialized.
nancy::fs::record(tx, paths, &dataset_path, message)
})
}) {
log::error!("Encountered error in RECORD program: {:?}", e);
process::exit(1);
};
}
fn main() {
env_logger::init();
let args = Cli::parse();
match &args.command {
Some(Command::Hello {}) => {
Some(Commands::Hello {}) => {
println!("Hello from nancy (binary)!");
match nancy::print_uuid() {
Ok(_) => {
@ -101,55 +51,63 @@ fn main() {
}
};
}
Some(Command::Init {
name,
dataset_path
}) => {
if !dataset_path.is_dir() {
log::error!("Path {:?} does not point to an existing directory", dataset_path);
process::exit(1);
}
let dbpath = &dataset_path.join("nancy.db");
if dbpath.exists() {
log::error!("Database {:?} exists, indicating this dataset is already \
initialized. Refusing to overwrite.", dbpath);
process::exit(1);
}
log::info!("Initializing new database at {:?}", dbpath);
let mut conn = match Connection::open(dbpath) {
Err(e) => {
log::error!(
"Could not open new SQLite database at {dbpath:?}: {:?}",
e
);
process::exit(1);
}
Ok(cc) => cc,
};
conn.pragma_update(None, "foreign_keys", &"ON").unwrap();
let u = init_schema(&mut conn);
do_record(
&mut conn,
&format!("Initial record of dataset {:?}", u),
&[dataset_path.to_path_buf()],
dataset_path,
);
}
Some(Command::Record {
Some(Commands::Record {
initialize,
message,
record_paths,
}) => {
// If no paths are given, use ["."] for the following steps.
// Determine dataset dir (ds_dir)
match nancy::db::find_dataset_dir(record_paths) {
let mut conn = match nancy::db::find_dataset_dir(record_paths) {
Err(nancy::db::FindDatasetError::NoDataset(path)) => {
log::info!("No dataset at or above nearest ancestor path: {:?}", path);
log::error!(
"Refusing to initialize a new dataset at {path:?}. \
Use `nancy init` to create a dataset instead."
);
process::exit(1);
if !initialize {
log::error!(
"Refusing to initialize a new dataset at {path:?}. \
Pass the -i or --initialize flag to request initialization."
);
process::exit(1);
}
let dbpath = &path.join("nancy.db");
log::info!("Initializing new database at {:?}", dbpath);
let mut c = match Connection::open(dbpath) {
Err(e) => {
log::error!(
"Could not open new SQLite database at {dbpath:?}: {:?}",
e
);
process::exit(1);
}
Ok(cc) => cc,
};
c.pragma_update(None, "foreign_keys", &"ON").unwrap();
match nancy::db::init(&mut c) {
Err(e) => {
log::error!("Encountered error in initializing schema: {:?}", e);
process::exit(1);
}
Ok(dataset_uuid) => {
log::trace!("Init OK");
log::info!("Dataset UUID is {dataset_uuid}");
// Run an empty program so that the dataset log reflects when it was
// initialized
nancy::program::with_program(
&mut c,
"INIT",
"Initialize dataset",
|prog| {
let _ = prog.perform_task(&[], |task| {
log::debug!("INIT task UUID is {}", task.uuid);
});
let okres: Result<(), ()> = Ok(());
okres
},
)
.expect("Empty program should not throw error");
}
}
c
}
Err(e) => {
log::error!("Could not determine dataset directory: {:?}", e);
@ -160,7 +118,7 @@ fn main() {
log::info!("Found existing dataset at path: {:?}", path);
let dbpath = &path.join("nancy.db");
// open with flags to prevent creating when we believe the db exists
let mut conn = match Connection::open_with_flags(
let mut c = match Connection::open_with_flags(
dbpath,
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX,
) {
@ -172,8 +130,8 @@ fn main() {
}
Ok(cc) => cc,
};
conn.pragma_update(None, "foreign_keys", &"ON").unwrap();
match nancy::db::ensure_schema(&mut conn) {
c.pragma_update(None, "foreign_keys", &"ON").unwrap();
match nancy::db::ensure_schema(&mut c) {
Err(e) => {
log::error!("Error ensuring schema: {}", e);
process::exit(1);
@ -182,12 +140,23 @@ fn main() {
log::debug!("Schema ensured: {:?}", update_result);
}
}
do_record(&mut conn, message, record_paths.as_slice(), &path);
c
}
};
if let Err(e) = nancy::program::with_program(&mut conn, "RECORD", message, |prog| {
prog.perform_task(&[], |_task| {
let dataset_path = PathBuf::from(".");
// Note that this may fail, in which case we should roll back only this program
// but keep the dataset initialized.
nancy::fs::record(&dataset_path, message)
})
}) {
log::error!("Encountered error in RECORD program: {:?}", e);
process::exit(1);
};
}
Some(Command::Status {}) => {
Some(Commands::Status {}) => {
println!("status not yet implemented");
}
None => {}

View File

@ -1,5 +1,5 @@
-- Simple key/value table describing _this_ dataset (not imported ones).
-- In particular, the key "local_dataset" should be the UUID of the dataset in this
-- Simple key/value table describing _this_ store (not imported ones).
-- In particular, the key "local_store" should be the UUID of the store in this
-- directory.
CREATE TABLE local_metadata(
key TEXT PRIMARY KEY NOT NULL,
@ -54,8 +54,8 @@ CREATE INDEX FK_user_machine ON user (machine);
-- Stores and files (and directories)
-- These are the primary objects tracked by nancy.
-- A dataset is a directory containing a file called nancy.db (e.g. the dir holding this database)
-- In order to reliably merge dataset entries (like when we have converging
-- A store is a directory containing a file called nancy.db (e.g. the dir holding this database)
-- In order to reliably merge store entries (like when we have converging
-- dependencies), we need to deduplicate. The machine and path could match, for
-- example if a database is created in one location then copied elsewhere,
-- followed by regenerating the original database. In these cases, we would want
@ -67,29 +67,29 @@ CREATE INDEX FK_user_machine ON user (machine);
-- Client code should generate random UUIDs in the RFC 4122 variant layout.
-- https://datatracker.ietf.org/doc/html/rfc4122.html
-- This is possible in Python by simply calling uuid.uuid4() with no arguments
CREATE TABLE dataset (
CREATE TABLE store (
uuid BLOB PRIMARY KEY NOT NULL -- UUID generated by uuid.uuid4()
);
-- The filedir table holds all files and directories that are tracked by the
-- dataset. This table also holds tracked files and directories that have been
-- imported and live outside the current dataset.
-- store. This table also holds tracked files and directories that have been
-- imported and live outside the current store.
-- We do not support renaming files.
CREATE TABLE filedir (
sha256 BLOB PRIMARY KEY NOT NULL,
dataset BLOB NOT NULL REFERENCES dataset ON UPDATE CASCADE,
store BLOB NOT NULL REFERENCES store ON UPDATE CASCADE,
name TEXT, -- only a filename, not a path
parent BLOB REFERENCES filedir ON UPDATE CASCADE,
UNIQUE(dataset, name, parent)
UNIQUE(store, name, parent)
);
CREATE INDEX FK_filedir_dataset ON filedir (dataset);
CREATE INDEX FK_filedir_store ON filedir (store);
CREATE INDEX FK_filedir_parent ON filedir (parent);
-- Detect cross-dataset references
-- Detect cross-store references
CREATE TRIGGER insert_filedir BEFORE INSERT ON filedir
BEGIN SELECT CASE
WHEN NEW.parent IS NOT NULL AND NEW.dataset != (SELECT dataset FROM filedir WHERE sha256 = NEW.parent)
THEN RAISE (ABORT, 'Parent resides in different dataset')
WHEN NEW.parent IS NOT NULL AND NEW.store != (SELECT store FROM filedir WHERE sha256 = NEW.parent)
THEN RAISE (ABORT, 'Parent resides in different store')
END; END;
CREATE TRIGGER update_filedir BEFORE UPDATE ON filedir
BEGIN
@ -112,8 +112,12 @@ CREATE TABLE filedir_version (
-- Note that changing filetype (e.g. directory becomes file) or deleting a
-- file are simply just new versions of a filedir.
filetype TEXT, -- One of 'LNK', 'DIR', 'REG', etc.
filetype TEXT, -- One of 'LNK', 'DIR', 'REG', etc. See store.FSEntry.from_path for details
deleted BOOL NOT NULL, -- set True when recording a deleted file
-- We record the permissions on each file to enable fixing if needed
perms TEXT, -- stat.filemode(os.stat(path).st_mode): '-rw-rw-r--'
symlink_target TEXT, -- if this is a symlink, this is the (read but not fully resolved) target. i.e. this is the "content" of the symlink.
-- The following hash can be NULL if the file was deleted. It could also be

View File

@ -1,6 +1,7 @@
use log;
use rusqlite::{Connection, Error as RSQLError, Transaction};
extern crate derive_more;
use derive_more::From;
use uuid::Uuid;
use std::time::{Instant, SystemTime};
@ -30,7 +31,6 @@ impl Program {
pub fn new(tx: &mut Transaction, name: &str, message: &str) -> Program {
let u = Uuid::new_v4();
log::debug!("New {} Program with UUID {}", name, u);
// TODO: Insert this program into the database
Program {
uuid: u,
name: name.to_string(),
@ -67,7 +67,7 @@ impl<E> From<E> for ProgramError<E> {
}
/// Run a closure as a program, within a database transaction
pub fn with_program<E, F: FnOnce(Program, &mut Transaction) -> Result<(), E>>(
pub fn with_program<E, F: FnOnce(Program) -> Result<(), E>>(
conn: &mut Connection,
name: &str,
message: &str,
@ -95,14 +95,7 @@ pub fn with_program<E, F: FnOnce(Program, &mut Transaction) -> Result<(), E>>(
log::debug!(target: log_target, "Starting timers");
let start_st = SystemTime::now();
let start = Instant::now();
let start_stamp = timing::persistent_stamp(start, start, start_st);
log::debug!(
target: log_target,
"Start time: {:?} f64 timestamp={:#?}",
start_st,
start_stamp
);
let start_stamp = timing::persistent_stamp(Instant::now(), start, start_st);
// Instantiate Program
// (record name and message for new program, get program ID)
@ -111,11 +104,17 @@ pub fn with_program<E, F: FnOnce(Program, &mut Transaction) -> Result<(), E>>(
log::info!(target: log_target, "Running {:?}", prog);
// run closure with program argument
f(prog, &mut tx)?; // if closure fails, transaction will be rolled back
f(prog)?; // if closure fails, transaction will be rolled back
// end timer
// Commit scope for RECORD program
let end = Instant::now();
log::debug!(
target: log_target,
"Start time: {:?} f64 timestamp={:#?}",
start_st,
start_stamp
);
log::debug!(
target: log_target,
"Elapsed: {} seconds",
@ -123,7 +122,6 @@ pub fn with_program<E, F: FnOnce(Program, &mut Transaction) -> Result<(), E>>(
);
// record end time
// commit transaction
log::debug!(target: log_target, "Committing transaction and finalizing program");
match tx.commit() {
Err(e) => Err(ProgramError::CommitFailed(e)),
Ok(_) => Ok(()),