First draft of record() command.

So far, this can record content hashes and recorded times/filetypes, as
well as symlink targets into a temp table. There is no detection of
deleted files, and non-root paths are left dangling. Also, no directory
content hashes are computed.

Currently I am using sha256 of parent sha256 + filename as the key in
filedir. This is pretty wasteful as they are 32 bytes each, and since
each filedir entry has a parent (except the root), this is 64 bytes for
each entry just in keys (using ints this would be just a few bytes
instead). Maintaining a low probability of collision is important for a
distributed system like this where we envision importing datasets into
possibly very large master databases. However, the sha256 itself starts
from a 128bit UUID for the root's parent, so there is already only that
much collision avoidance (which is already very large).

Moving to UUIDs for keys is attractive for that 2x savings in space; the
obvious candidate would be v5 UUIDs, which are derived from a UUID
namespace and a string. The string could be the relative path for each
filedir entry. Alternatively, we could use the parent's UUID as
namespace and the filename as the string, similar to how we use those
bits of information now to compute the SHA256 hash.

Changing to UUID (v5) keys for filedir in addition to the current UUID
(v4) keys for filedir_version, suggests that perhaps we should switch to
UUID (v5) for all our other keys. Tables with deterministic sha256 keys
are:
 - machine
 - user
 - filedir
 - environment
 - package
 - module
 - func
Each of these are derived from some bit of information that's typically
also present in each row of the table. In other words, the sha256 is
there as a convenient way to avoid using multi-column primary keys. But
it may double or triple some of these table sizes, so we should really
consider using a minimal hash, or in some cases we could consider other
alternatives such as integer primary keys. I've avoided that in this
rewrite so far since it complicates importing, but it's likely the most
space-efficient.
This commit is contained in:
Jacob Hinkle 2022-11-09 12:48:35 -05:00
parent e5fa7a32f2
commit bffef78291
5 changed files with 806 additions and 109 deletions

304
Cargo.lock generated
View File

@ -22,6 +22,18 @@ dependencies = [
"memchr", "memchr",
] ]
[[package]]
name = "arrayref"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4c527152e37cf757a3f78aae5a06fbeefdb07ccc535c980a3208ee3060dd544"
[[package]]
name = "arrayvec"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
[[package]] [[package]]
name = "atty" name = "atty"
version = "0.2.14" version = "0.2.14"
@ -33,12 +45,32 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "autocfg"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]] [[package]]
name = "bitflags" name = "bitflags"
version = "1.3.2" version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "blake3"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a08e53fc5a564bb15bfe6fae56bd71522205f1f91893f9c0116edad6496c183f"
dependencies = [
"arrayref",
"arrayvec",
"cc",
"cfg-if",
"constant_time_eq",
"digest",
]
[[package]] [[package]]
name = "block-buffer" name = "block-buffer"
version = "0.10.3" version = "0.10.3"
@ -48,6 +80,18 @@ dependencies = [
"generic-array", "generic-array",
] ]
[[package]]
name = "bumpalo"
version = "3.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba"
[[package]]
name = "cc"
version = "1.0.74"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "581f5dba903aac52ea3feb5ec4810848460ee833876f1f9b0fdeab1f19091574"
[[package]] [[package]]
name = "cfg-if" name = "cfg-if"
version = "1.0.0" version = "1.0.0"
@ -91,6 +135,12 @@ dependencies = [
"os_str_bytes", "os_str_bytes",
] ]
[[package]]
name = "constant_time_eq"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc"
[[package]] [[package]]
name = "convert_case" name = "convert_case"
version = "0.4.0" version = "0.4.0"
@ -98,12 +148,70 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e"
[[package]] [[package]]
name = "cpufeatures" name = "crossbeam"
version = "0.2.5" version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28d997bd5e24a5928dd43e46dc529867e207907fe0b239c3477d924f7f2ca320" checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c"
dependencies = [ dependencies = [
"libc", "cfg-if",
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "715e8152b692bba2d374b53d4875445368fdf21a94751410af607a5ac677d1fc"
dependencies = [
"cfg-if",
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f916dfc5d356b0ed9dae65f1db9fc9770aa2851d2662b988ccf4fe3516e86348"
dependencies = [
"autocfg",
"cfg-if",
"crossbeam-utils",
"memoffset",
"scopeguard",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cd42583b04998a5363558e5f9291ee5a5ff6b49944332103f251e7479a82aa7"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edbafec5fa1f196ca66527c1b12c2ec4745ca14b50f1ad8f9f6f720b55d11fac"
dependencies = [
"cfg-if",
] ]
[[package]] [[package]]
@ -137,8 +245,15 @@ checksum = "adfbc57365a37acbd2ebf2b64d7e69bb766e2fea813521ed536f5d0520dcf86c"
dependencies = [ dependencies = [
"block-buffer", "block-buffer",
"crypto-common", "crypto-common",
"subtle",
] ]
[[package]]
name = "either"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797"
[[package]] [[package]]
name = "env_logger" name = "env_logger"
version = "0.9.1" version = "0.9.1"
@ -224,6 +339,25 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "js-sys"
version = "0.3.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49409df3e3bf0856b916e2ceaca09ee28e6871cf7d9ce97a692cacfdb2a25a47"
dependencies = [
"wasm-bindgen",
]
[[package]]
name = "jwalk"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "172752e853a067cbce46427de8470ddf308af7fd8ceaf9b682ef31a5021b6bb9"
dependencies = [
"crossbeam",
"rayon",
]
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.135" version = "0.2.135"
@ -255,21 +389,43 @@ version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
name = "memoffset"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce"
dependencies = [
"autocfg",
]
[[package]] [[package]]
name = "nancy" name = "nancy"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"blake3",
"clap", "clap",
"derive_more", "derive_more",
"env_logger", "env_logger",
"jwalk",
"log", "log",
"once_cell", "once_cell",
"rayon",
"ring",
"rusqlite", "rusqlite",
"rusqlite_migration", "rusqlite_migration",
"sha2",
"uuid", "uuid",
] ]
[[package]]
name = "num_cpus"
version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1"
dependencies = [
"hermit-abi",
"libc",
]
[[package]] [[package]]
name = "once_cell" name = "once_cell"
version = "1.15.0" version = "1.15.0"
@ -330,6 +486,30 @@ dependencies = [
"proc-macro2", "proc-macro2",
] ]
[[package]]
name = "rayon"
version = "1.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd99e5772ead8baa5215278c9b15bf92087709e9c1b2d1f97cdb5a183c933a7d"
dependencies = [
"autocfg",
"crossbeam-deque",
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "258bcdb5ac6dad48491bb2992db6b7cf74878b0384908af124823d118c99683f"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-utils",
"num_cpus",
]
[[package]] [[package]]
name = "regex" name = "regex"
version = "1.6.0" version = "1.6.0"
@ -347,6 +527,21 @@ version = "0.6.27"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244" checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244"
[[package]]
name = "ring"
version = "0.16.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc"
dependencies = [
"cc",
"libc",
"once_cell",
"spin",
"untrusted",
"web-sys",
"winapi",
]
[[package]] [[package]]
name = "rusqlite" name = "rusqlite"
version = "0.28.0" version = "0.28.0"
@ -381,35 +576,42 @@ dependencies = [
"semver", "semver",
] ]
[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]] [[package]]
name = "semver" name = "semver"
version = "1.0.14" version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e25dfac463d778e353db5be2449d1cce89bd6fd23c9f1ea21310ce6e5a1b29c4" checksum = "e25dfac463d778e353db5be2449d1cce89bd6fd23c9f1ea21310ce6e5a1b29c4"
[[package]]
name = "sha2"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
]
[[package]] [[package]]
name = "smallvec" name = "smallvec"
version = "1.10.0" version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
[[package]]
name = "spin"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]] [[package]]
name = "strsim" name = "strsim"
version = "0.10.0" version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "subtle"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]] [[package]]
name = "syn" name = "syn"
version = "1.0.102" version = "1.0.102"
@ -442,6 +644,12 @@ version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3" checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3"
[[package]]
name = "untrusted"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]] [[package]]
name = "uuid" name = "uuid"
version = "1.2.1" version = "1.2.1"
@ -469,6 +677,70 @@ version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm-bindgen"
version = "0.2.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eaf9f5aceeec8be17c128b2e93e031fb8a4d469bb9c4ae2d7dc1888b26887268"
dependencies = [
"cfg-if",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c8ffb332579b0557b52d268b91feab8df3615f265d5270fec2a8c95b17c1142"
dependencies = [
"bumpalo",
"log",
"once_cell",
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "052be0f94026e6cbc75cdefc9bae13fd6052cdcaf532fa6c45e7ae33a1e6c810"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07bc0c051dc5f23e307b13285f9d75df86bfdf816c5721e573dec1f9b8aa193c"
dependencies = [
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c38c045535d93ec4f0b4defec448e4291638ee608530863b1e2ba115d4fff7f"
[[package]]
name = "web-sys"
version = "0.3.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bcda906d8be16e728fd5adc5b729afad4e444e106ab28cd1c7256e54fa61510f"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]] [[package]]
name = "winapi" name = "winapi"
version = "0.3.9" version = "0.3.9"

View File

@ -18,12 +18,15 @@ name = "nancy"
path = "src/main.rs" path = "src/main.rs"
[dependencies] [dependencies]
blake3 = "1.3.1"
clap = { version = "4.0.14", features = ["derive"] } clap = { version = "4.0.14", features = ["derive"] }
derive_more = "0.99.17" derive_more = "0.99.17"
env_logger = "0.9.1" env_logger = "0.9.1"
jwalk = "0.6.0"
log = "0.4.17" log = "0.4.17"
once_cell = "1.15.0" once_cell = "1.15.0"
rayon = "1.5.3"
ring = "0.16.20"
rusqlite = { version = "0.28.0", features = ["uuid"] } rusqlite = { version = "0.28.0", features = ["uuid"] }
rusqlite_migration = "1.0.0" rusqlite_migration = "1.0.0"
sha2 = "0.10.6"
uuid = { version = "1.2.1", features = ["v4"] } uuid = { version = "1.2.1", features = ["v4"] }

409
src/fs.rs
View File

@ -1,18 +1,407 @@
//use jwalk; use derive_more::{From};
use jwalk::{Error as JWalkError, WalkDir, WalkDirGeneric};
use log; use log;
//use sha2::{digest::FixedOutput, Digest, Sha256}; use rayon::prelude::*;
use std::path::Path; use ring::digest::{Context, SHA256};
use rusqlite::{Error as RSError, Result as RSResult, ToSql, Transaction, types as rstypes};
use uuid::{Uuid};
use std::collections::{LinkedList};
use std::fmt;
use std::fs::{File};
use std::io::{BufReader, Error as IOError, Read, Result as IOResult};
use std::path::{Path, PathBuf};
use std::time::{Instant, SystemTime};
use crate::timing::{persistent_stamp};
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum FileType {
Other = 0, // char/block devices, FIFOs, sockets...
Regular = 1,
Directory = 2,
Symlink = 3,
}
impl From<std::fs::FileType> for FileType {
fn from(ft: std::fs::FileType) -> Self {
if ft.is_file() {
FileType::Regular
} else if ft.is_dir() {
FileType::Directory
} else if ft.is_symlink() {
FileType::Symlink
} else {
FileType::Other
}
}
}
impl fmt::Display for FileType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self);
Ok(())
}
}
#[derive(Debug)] #[derive(Debug)]
struct FSVersion {
uuid: Uuid,
//content_sha256: [u8; 32],
//filename: String,
//parent: [u8; 32],
}
impl FSVersion {
fn from_file(path: &Path, dataset_uuid: Uuid, dataset_root: &Path) -> FSVersion {
let u = Uuid::new_v4();
//let hasher = Sha256::new();
FSVersion {
uuid: u,
//filename: "",
//content_sha256: hasher.finalize(),
}
}
}
#[derive(Debug, From)]
pub enum RecordError { pub enum RecordError {
SQLError(RSError),
IOError(IOError),
DirectoryWalkError(JWalkError),
UUIDParseError(uuid::Error),
CantGetFilename,
FilenameNotUTF8,
ParentHashNotSet,
TooBigDepthJump,
NotImplemented, NotImplemented,
} }
pub fn record(path: &Path, message: &String) -> Result<(), RecordError> { #[derive(Clone,Debug)]
log::info!( struct FileInfo {
"Recording path {:?} with user-provided message \"{}\"", parent_sha256: Option<
path, [u8; 32]
message >,
); content_hash: [u8; 32],
Err(RecordError::NotImplemented) }
impl Default for FileInfo {
fn default() -> Self {
FileInfo {
parent_sha256: None,
content_hash: [0; 32],
}
}
}
#[derive(Copy,Clone,Debug,From)]
pub struct Hash256([u8; 32]);
impl fmt::LowerHex for Hash256 {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
for byte in self.0 {
write!(f, "{:x}", byte);
}
Ok(())
}
}
impl fmt::UpperHex for Hash256 {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
for byte in self.0 {
write!(f, "{:X}", byte);
}
Ok(())
}
}
impl fmt::Display for Hash256 {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
for byte in self.0 {
write!(f, "{:x}", byte);
}
Ok(())
}
}
impl ToSql for Hash256 {
fn to_sql(&self) -> RSResult<rstypes::ToSqlOutput<'_>> {
Ok(rstypes::ToSqlOutput::from(&self.0[..]))
}
}
fn buffered_hash256<'a, R: Read>(mut reader: R) -> IOResult<(Hash256, usize)> {
let mut ctx = Context::new(&SHA256);
let mut buffer = [0; 1024 * 128];
let mut num_bytes: usize = 0;
loop {
let count = reader.read(&mut buffer)?;
if count == 0 {
break;
}
ctx.update(&buffer[..count]);
num_bytes += count;
}
let mut hash: [u8; 32] = [0; 32];
hash.clone_from_slice(ctx.finish().as_ref());
Ok((hash.into(), num_bytes))
}
/// Walk the directory while sorting by filename, maintaining a stack of hashes (keys) for
/// directories and insert entries into current_files table as we pass over them. This means
/// computing the sha256 key, storing it along with parent, filetype, and symlink_target.
fn walk_and_insert(tx: &mut Transaction, p: &Path, path_key: Hash256) -> Result<(), RecordError> {
let mut insert_stmt = tx.prepare(
"INSERT OR IGNORE INTO current_files
(sha256, name, filepath, parent, version_uuid, filetype, symlink_target, recorded_time)
VALUES(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)")?;
let mut dirstack: Vec<Option<Hash256>> = vec![];
let mut parent: Option<Hash256> = None;
let mut lastkey: Option<Hash256> = None;
let mut depth: usize = 0;
let start_systemtime = SystemTime::now();
let start_instant = Instant::now();
log::trace!("walk_and_insert {:?}", p);
for entry_result in WalkDir::new(p)
.follow_links(false)
.skip_hidden(true)
.sort(true) {
let entry = entry_result?;
let path = entry.path();
let pathstr = path.to_str()
.expect("Path will be convertable to a UTF-8 string");
let filetype = FileType::from(entry.file_type());
let filename = path.file_name()
.expect("Path will end in a named component (unlike / or foo/..)")
.to_str()
.expect("File name will be convertable to a UTF-8 string");
log::debug!("{:?} : finding target", path);
let symlink_target = if filetype == FileType::Symlink {
Some(
std::fs::read_link(&path)?
.to_str()
.expect("Symlink target will be convertable to a UTF-8 string")
.to_string()
)
} else { None };
log::debug!("{:?} : target = {:?}", path, symlink_target);
if entry.depth > depth {
// descending into directory
if entry.depth != depth + 1 {
// we should never descend two or more steps at a time
return Err(RecordError::TooBigDepthJump);
}
//log::debug!("Down");
dirstack.push(parent);
parent = lastkey;
depth += 1;
}
while depth > entry.depth {
// finished processing subdirectory, going back up the stack
//log::debug!("Up");
parent = dirstack.pop().unwrap();
depth -= 1;
log::debug!(
" parent={} ed={} d={}",
&(match parent {
Some(h) => format!("{}", h),
None => "None".to_string(),
})[..8],
entry.depth,
depth,
);
}
let mut ctx = Context::new(&SHA256);
match parent {
Some(Hash256(b)) => {
ctx.update(&b[..]);
}
None => { // Root. hash is the hash of the uuid for this dataset
ctx.update(&path_key.0);
}
}
ctx.update(filename.as_bytes());
let mut thiskey: [u8; 32] = [0; 32];
thiskey.clone_from_slice(ctx.finish().as_ref());
let thiskey = Hash256(thiskey);
let ver_uuid = Uuid::new_v4();
insert_stmt.execute((
thiskey.0,
filename,
pathstr,
parent,
ver_uuid.as_bytes(),
filetype as u8,
symlink_target,
persistent_stamp(Instant::now(), start_instant, start_systemtime),
))?;
lastkey = Some(thiskey);
}
Ok(())
}
/// Compute content hashes for all files in the current_files temp table. Symlinks and "Other"
/// types will have NULL hashes, and directory hashes will be computed in a separate step.
fn insert_file_content_hashes(tx: &mut Transaction) -> Result<(), RecordError> {
log::trace!("insert_file_content_hashes");
// Extract all regular files (key and path) from current_files, in arbitrary order, and compute
// content hashes for each, then update the corresponding entry in current_files.
let allpaths: Vec<RSResult<(i32, String)>> = tx.prepare(
&format!(
"SELECT id, filepath FROM current_files WHERE filetype == {}",
FileType::Regular as u8)
)?.query_map((), |row| {
let id = row.get::<usize, i32>(0).unwrap();
let filepath = row.get::<usize, String>(1).unwrap();
//println!("{} {}", id, filepath);
Ok((id, filepath))
}).unwrap().collect::<Vec<
RSResult<(i32, String)>>>();
let start_systemtime = SystemTime::now();
let start_instant = Instant::now();
let hashes: Vec<IOResult<(i32, Hash256, usize, Instant)>> = allpaths.par_iter().map(|fpres| {
if let Ok((id, filepath)) = fpres {
let input = File::open(filepath)?;
let mut reader = BufReader::new(input);
let (chash, size_bytes) = buffered_hash256(reader)?;
Ok((*id, chash, size_bytes, Instant::now()))
} else { // should never happen
Ok((0 as i32, Hash256([0; 32]), 0, Instant::now()))
}
}).collect();
let mut update_stmt = tx.prepare(
"UPDATE current_files
SET
content_sha256 = ?2,
size_bytes = ?3,
recorded_time = ?4
WHERE
id = ?1")?;
for hashresult in hashes {
if let Ok((id, hash, size_bytes, recorded_instant)) = hashresult {
update_stmt.execute((
id,
hash,
size_bytes,
persistent_stamp(recorded_instant, start_instant, start_systemtime),
))?;
}
}
Ok(())
}
pub fn record(
tx: &mut Transaction,
paths: &[PathBuf],
ds_root: &Path,
message: &str,
//task_uuid: Uuid,
) -> Result<(), RecordError> {
log::info!(
"Recording path {:?} for dataset at {:?} with user-provided message \"{}\"",
paths,
ds_root,
message,
);
// get local dataset UUID
let u = Uuid::parse_str(
tx.prepare("SELECT value FROM local_metadata WHERE key = 'dataset_uuid'")?
.query_row([], |row| {
let uuidstr: Result<String, RSError> = row.get(0);
uuidstr
})?.as_str()
)?;
log::debug!("Found dataset UUID: {}", u);
// compute dataset root key. This is the SHA256 of the UUID
let mut ctx = Context::new(&SHA256);
ctx.update(u.as_bytes());
let mut root_key: [u8; 32] = [0; 32];
root_key.clone_from_slice(ctx.finish().as_ref());
let root_key = Hash256(root_key);
log::info!("Root key is {}", root_key);
// This schema is like filedir joined with filedir_version
// TODO: revert this to a TEMP TABLE after it's debugged
tx.execute("CREATE
-- TEMP
TABLE current_files (
id INTEGER PRIMARY KEY NOT NULL, -- only used in this table
sha256 BLOB NOT NULL, -- will become the primary key on filedir
name TEXT NOT NULL, -- filename without path
filepath TEXT NOT NULL, -- local path to file
parent BLOB, -- hash referencing either filedir or this table
version_uuid BLOB NOT NULL, -- generated UUID v4 for this version
recorded_time REAL, -- float64 representation of unix timestamp
filetype INTEGER, -- corresponds to the FileType enum
-- deleted BOOL NOT NULL, -- deleted is always FALSE for this table
symlink_target TEXT,
size_bytes INTEGER, -- null for anything but files/dirs. For dirs, the sum of children
content_sha256 BLOB,
UNIQUE(sha256) -- prevent inserting same file twice
)", [])?;
for p in paths {
let p = p.canonicalize()?;
let mut ctx = Context::new(&SHA256);
ctx.update(u.as_bytes());
// manually compute the SHA256 key for this path, starting with the SHA256 of the dataset
// UUID to indicate the root directory, then taking the SHA256 of the parent plus filename,
// as we descend into subdirectories until we reach p.
let mut ancestors = LinkedList::new();
for a in p.ancestors() { // fill a stack of ancestor dirs
if a.canonicalize()? == ds_root.canonicalize()? { break };
ancestors.push_front(a);
}
let mut hashbuf: [u8; 32] = [0; 32];
let mut prev_key = root_key;
for a in ancestors {
// hash of parent hash + filename
let filename = a.file_name()
.ok_or(RecordError::CantGetFilename)?
.to_str().ok_or(RecordError::FilenameNotUTF8)?;
let mut ctx = Context::new(&SHA256);
ctx.update(&prev_key.0);
ctx.update(&filename.as_bytes());
hashbuf.clone_from_slice(ctx.finish().as_ref());
prev_key = Hash256(hashbuf);
}
walk_and_insert(tx, &p, prev_key)?; // SINGLE-THREADED
}
insert_file_content_hashes(tx)?; // MULTI-THREADED
// SINGLE-THREADED
// Extract files from current_files, in insertion order. For each row, if it's not a
// directory compare parent column to see whether we've changed to a new parent, in which
// case we finalize this parent dir by recording its size and hash. Once a directory is
// finalized, we can update its row in current_files. The recorded time of a directory
// should be the max of all children recorded times.
let mut dirstack: Vec<(Hash256, usize, Context, f64)> = Vec::new();
// Find latest entries in filedir_version that are not deleted, and that do not appear in
// current_files, but whose parents _do_ appear. Create new versions marking these as deleted.
// For each entry in current_files, create a new filedir_version.
// Find all entries in current_files that declare parents that are not None and are not in
// current_files. Each of these parents must be added with a new version. The content_hash must
// be derived from the existing and new file versions, which should be represented in
// filedir_version at this point. Add these in depth-first order.
// Drop the temporary current_files table
Ok(())
} }

View File

@ -1,7 +1,8 @@
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use rusqlite::{Connection, OpenFlags}; use rusqlite::{Connection, OpenFlags};
use uuid::{Uuid};
use std::path::PathBuf; use std::path::{Path, PathBuf};
use std::process; use std::process;
// Composable provenance tracking for scientific data analysis // Composable provenance tracking for scientific data analysis
@ -9,17 +10,24 @@ use std::process;
#[command(author, version, about, long_about = None, arg_required_else_help=true)] #[command(author, version, about, long_about = None, arg_required_else_help=true)]
struct Cli { struct Cli {
#[command(subcommand)] #[command(subcommand)]
command: Option<Commands>, command: Option<Command>,
} }
#[derive(Subcommand)] #[derive(Subcommand)]
enum Commands { enum Command {
/// Initialize a new dataset
#[command()]
Init {
/// A short descriptive name for the dataset
#[arg(short, long)]
name: String,
/// The top level directory of the dataset (must be a directory)
#[arg(default_value=".")]
dataset_path: PathBuf,
},
/// Record changes to files/directories within a dataset (or create a new dataset) /// Record changes to files/directories within a dataset (or create a new dataset)
#[command()] #[command()]
Record { Record {
/// If not within an existing dataset, create one
#[arg(short, long)]
initialize: bool,
/// A short descriptive message for this recording, i.e. "Re-run with lr=1e-3" /// A short descriptive message for this recording, i.e. "Re-run with lr=1e-3"
#[arg(short, long)] #[arg(short, long)]
message: String, message: String,
@ -35,54 +43,8 @@ enum Commands {
}, },
} }
fn main() { fn init_schema(conn: &mut Connection) -> Uuid {
env_logger::init(); match nancy::db::init(conn) {
let args = Cli::parse();
match &args.command {
Some(Commands::Hello {}) => {
println!("Hello from nancy (binary)!");
match nancy::print_uuid() {
Ok(_) => {
println!("OK");
}
Err(e) => {
println!("SQLite error: {}", e);
}
};
}
Some(Commands::Record {
initialize,
message,
record_paths,
}) => {
// If no paths are given, use ["."] for the following steps.
// Determine dataset dir (ds_dir)
let mut conn = match nancy::db::find_dataset_dir(record_paths) {
Err(nancy::db::FindDatasetError::NoDataset(path)) => {
log::info!("No dataset at or above nearest ancestor path: {:?}", path);
if !initialize {
log::error!(
"Refusing to initialize a new dataset at {path:?}. \
Pass the -i or --initialize flag to request initialization."
);
process::exit(1);
}
let dbpath = &path.join("nancy.db");
log::info!("Initializing new database at {:?}", dbpath);
let mut c = match Connection::open(dbpath) {
Err(e) => {
log::error!(
"Could not open new SQLite database at {dbpath:?}: {:?}",
e
);
process::exit(1);
}
Ok(cc) => cc,
};
c.pragma_update(None, "foreign_keys", &"ON").unwrap();
match nancy::db::init(&mut c) {
Err(e) => { Err(e) => {
log::error!("Encountered error in initializing schema: {:?}", e); log::error!("Encountered error in initializing schema: {:?}", e);
process::exit(1); process::exit(1);
@ -93,10 +55,10 @@ fn main() {
// Run an empty program so that the dataset log reflects when it was // Run an empty program so that the dataset log reflects when it was
// initialized // initialized
nancy::program::with_program( nancy::program::with_program(
&mut c, conn,
"INIT", "INIT",
"Initialize dataset", "Initialize dataset",
|prog| { |prog, _tx| {
let _ = prog.perform_task(&[], |task| { let _ = prog.perform_task(&[], |task| {
log::debug!("INIT task UUID is {}", task.uuid); log::debug!("INIT task UUID is {}", task.uuid);
}); });
@ -105,9 +67,89 @@ fn main() {
}, },
) )
.expect("Empty program should not throw error"); .expect("Empty program should not throw error");
dataset_uuid
} }
} }
c }
fn do_record(conn: &mut Connection, message: &str, paths: &[PathBuf], dataset_path: &Path) -> () {
if let Err(e) = nancy::program::with_program(conn, "RECORD", message, |prog, tx| {
prog.perform_task(&[], |_task| {
// Note that this may fail, in which case we should roll back only this program
// but keep the dataset initialized.
nancy::fs::record(tx, paths, &dataset_path, message)
})
}) {
log::error!("Encountered error in RECORD program: {:?}", e);
process::exit(1);
};
}
fn main() {
env_logger::init();
let args = Cli::parse();
match &args.command {
Some(Command::Hello {}) => {
println!("Hello from nancy (binary)!");
match nancy::print_uuid() {
Ok(_) => {
println!("OK");
}
Err(e) => {
println!("SQLite error: {}", e);
}
};
}
Some(Command::Init {
name,
dataset_path
}) => {
if !dataset_path.is_dir() {
log::error!("Path {:?} does not point to an existing directory", dataset_path);
process::exit(1);
}
let dbpath = &dataset_path.join("nancy.db");
if dbpath.exists() {
log::error!("Database {:?} exists, indicating this dataset is already \
initialized. Refusing to overwrite.", dbpath);
process::exit(1);
}
log::info!("Initializing new database at {:?}", dbpath);
let mut conn = match Connection::open(dbpath) {
Err(e) => {
log::error!(
"Could not open new SQLite database at {dbpath:?}: {:?}",
e
);
process::exit(1);
}
Ok(cc) => cc,
};
conn.pragma_update(None, "foreign_keys", &"ON").unwrap();
let u = init_schema(&mut conn);
do_record(
&mut conn,
&format!("Initial record of dataset {:?}", u),
&[dataset_path.to_path_buf()],
dataset_path,
);
}
Some(Command::Record {
message,
record_paths,
}) => {
// If no paths are given, use ["."] for the following steps.
// Determine dataset dir (ds_dir)
match nancy::db::find_dataset_dir(record_paths) {
Err(nancy::db::FindDatasetError::NoDataset(path)) => {
log::info!("No dataset at or above nearest ancestor path: {:?}", path);
log::error!(
"Refusing to initialize a new dataset at {path:?}. \
Use `nancy init` to create a dataset instead."
);
process::exit(1);
} }
Err(e) => { Err(e) => {
log::error!("Could not determine dataset directory: {:?}", e); log::error!("Could not determine dataset directory: {:?}", e);
@ -118,7 +160,7 @@ fn main() {
log::info!("Found existing dataset at path: {:?}", path); log::info!("Found existing dataset at path: {:?}", path);
let dbpath = &path.join("nancy.db"); let dbpath = &path.join("nancy.db");
// open with flags to prevent creating when we believe the db exists // open with flags to prevent creating when we believe the db exists
let mut c = match Connection::open_with_flags( let mut conn = match Connection::open_with_flags(
dbpath, dbpath,
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX, OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX,
) { ) {
@ -130,8 +172,8 @@ fn main() {
} }
Ok(cc) => cc, Ok(cc) => cc,
}; };
c.pragma_update(None, "foreign_keys", &"ON").unwrap(); conn.pragma_update(None, "foreign_keys", &"ON").unwrap();
match nancy::db::ensure_schema(&mut c) { match nancy::db::ensure_schema(&mut conn) {
Err(e) => { Err(e) => {
log::error!("Error ensuring schema: {}", e); log::error!("Error ensuring schema: {}", e);
process::exit(1); process::exit(1);
@ -140,23 +182,12 @@ fn main() {
log::debug!("Schema ensured: {:?}", update_result); log::debug!("Schema ensured: {:?}", update_result);
} }
} }
c do_record(&mut conn, message, record_paths.as_slice(), &path);
} }
}; };
if let Err(e) = nancy::program::with_program(&mut conn, "RECORD", message, |prog| {
prog.perform_task(&[], |_task| {
let dataset_path = PathBuf::from(".");
// Note that this may fail, in which case we should roll back only this program
// but keep the dataset initialized.
nancy::fs::record(&dataset_path, message)
})
}) {
log::error!("Encountered error in RECORD program: {:?}", e);
process::exit(1);
};
} }
Some(Commands::Status {}) => { Some(Command::Status {}) => {
println!("status not yet implemented"); println!("status not yet implemented");
} }
None => {} None => {}

View File

@ -1,7 +1,6 @@
use log; use log;
use rusqlite::{Connection, Error as RSQLError, Transaction}; use rusqlite::{Connection, Error as RSQLError, Transaction};
extern crate derive_more; extern crate derive_more;
use derive_more::From;
use uuid::Uuid; use uuid::Uuid;
use std::time::{Instant, SystemTime}; use std::time::{Instant, SystemTime};
@ -31,6 +30,7 @@ impl Program {
pub fn new(tx: &mut Transaction, name: &str, message: &str) -> Program { pub fn new(tx: &mut Transaction, name: &str, message: &str) -> Program {
let u = Uuid::new_v4(); let u = Uuid::new_v4();
log::debug!("New {} Program with UUID {}", name, u); log::debug!("New {} Program with UUID {}", name, u);
// TODO: Insert this program into the database
Program { Program {
uuid: u, uuid: u,
name: name.to_string(), name: name.to_string(),
@ -67,7 +67,7 @@ impl<E> From<E> for ProgramError<E> {
} }
/// Run a closure as a program, within a database transaction /// Run a closure as a program, within a database transaction
pub fn with_program<E, F: FnOnce(Program) -> Result<(), E>>( pub fn with_program<E, F: FnOnce(Program, &mut Transaction) -> Result<(), E>>(
conn: &mut Connection, conn: &mut Connection,
name: &str, name: &str,
message: &str, message: &str,
@ -95,7 +95,14 @@ pub fn with_program<E, F: FnOnce(Program) -> Result<(), E>>(
log::debug!(target: log_target, "Starting timers"); log::debug!(target: log_target, "Starting timers");
let start_st = SystemTime::now(); let start_st = SystemTime::now();
let start = Instant::now(); let start = Instant::now();
let start_stamp = timing::persistent_stamp(Instant::now(), start, start_st); let start_stamp = timing::persistent_stamp(start, start, start_st);
log::debug!(
target: log_target,
"Start time: {:?} f64 timestamp={:#?}",
start_st,
start_stamp
);
// Instantiate Program // Instantiate Program
// (record name and message for new program, get program ID) // (record name and message for new program, get program ID)
@ -104,17 +111,11 @@ pub fn with_program<E, F: FnOnce(Program) -> Result<(), E>>(
log::info!(target: log_target, "Running {:?}", prog); log::info!(target: log_target, "Running {:?}", prog);
// run closure with program argument // run closure with program argument
f(prog)?; // if closure fails, transaction will be rolled back f(prog, &mut tx)?; // if closure fails, transaction will be rolled back
// end timer // end timer
// Commit scope for RECORD program // Commit scope for RECORD program
let end = Instant::now(); let end = Instant::now();
log::debug!(
target: log_target,
"Start time: {:?} f64 timestamp={:#?}",
start_st,
start_stamp
);
log::debug!( log::debug!(
target: log_target, target: log_target,
"Elapsed: {} seconds", "Elapsed: {} seconds",
@ -122,6 +123,7 @@ pub fn with_program<E, F: FnOnce(Program) -> Result<(), E>>(
); );
// record end time // record end time
// commit transaction // commit transaction
log::debug!(target: log_target, "Committing transaction and finalizing program");
match tx.commit() { match tx.commit() {
Err(e) => Err(ProgramError::CommitFailed(e)), Err(e) => Err(ProgramError::CommitFailed(e)),
Ok(_) => Ok(()), Ok(_) => Ok(()),