From 5151703dcc20637b0ffe5ba164e70b51e4dcef4a Mon Sep 17 00:00:00 2001 From: Jacob Hinkle Date: Wed, 16 Nov 2022 09:55:26 -0500 Subject: [PATCH] Refactor record() a bit to clearly state steps, with stubs --- src/fs.rs | 98 +++++++++++++++++++++++++++++++++++++++++------------ src/main.rs | 4 +-- 2 files changed, 78 insertions(+), 24 deletions(-) diff --git a/src/fs.rs b/src/fs.rs index 01eb8d2..cdbf081 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -284,12 +284,73 @@ fn insert_file_content_hashes(tx: &Transaction) -> Result<(), RecordError> { Ok(()) } +/// Find latest entries in filedir_version that are not deleted, and that do not appear in +/// current_files, but whose parents _do_ appear. Create new versions marking these as deleted. +fn find_deleted(tx: &Transaction, p: &Path, path_key: Hash256) -> Result<(), RecordError> { + // Do a CTE from filedir to get key and relpath for all files below p (inclusive) + + // For files in filedir but not in current_files, insert entries into temp table deleted_files + // marking them as deleted. + + log::warn!("find_deleted not yet implemented"); + Ok(()) +} + +/// Remove rows from deleted_files which were previously deleted in the database +fn prune_deleted(tx: &Transaction) -> Result<(), RecordError> { + // Join filedir_version to deleted_files to get latest deleted status, then drop + + log::warn!("prune_deleted not yet implemented"); + Ok(()) +} + +/// Compute directory hashes in the current_files table +fn compute_current_directory_hashes(tx: &Transaction) -> Result<(), RecordError> { + // Extract files with a CTE on current_files, in alphabetical order. For each row, if it's not + // a directory compare parent column to see whether we've changed to a new parent, in which + // case we finalize this parent dir by recording its size and hash. Once a directory is + // finalized, we can update its row in current_files. The recorded time of a directory should + // be the max of all children recorded times. + let mut dirstack: Vec<(Hash256, usize, Context, f64)> = Vec::new(); + + // Find all entries in current_files that declare parents that are not None and are not in + // current_files. Each of these parents must be added with a new version. The content_hash must + // be derived from the existing and new file versions, which should be represented in + // filedir_version at this point. Add these in depth-first order. + + log::warn!("update_current_directories not yet implemented"); + Ok(()) +} + +/// Transfer rows in current_files and deleted_files into new rows in filedir_version +fn persist_temp_tables(tx: &Transaction, task_uuid: Uuid, dataset_uuid: Uuid) -> Result<(), RecordError> { + tx.execute(" + INSERT OR IGNORE INTO + filedir + SELECT + sha256, ?, name, parent + FROM current_files + ", + (dataset_uuid.as_bytes(),), + )?; + tx.execute(" + INSERT INTO + filedir_version + SELECT + version_uuid, sha256, recorded_time, filetype, FALSE, symlink_target, content_sha256, ? + FROM current_files + ", + (task_uuid.as_bytes(),), + )?; + Ok(()) +} + pub fn record( tx: &Transaction, paths: &[PathBuf], ds_root: &Path, message: &str, - //task_uuid: Uuid, + task_uuid: Uuid, ) -> Result<(), RecordError> { log::info!( "Recording path {:?} for dataset at {:?} with user-provided message \"{}\"", @@ -319,9 +380,7 @@ pub fn record( // This schema is like filedir joined with filedir_version // TODO: revert this to a TEMP TABLE after it's debugged - tx.execute("CREATE - -- TEMP - TABLE current_files ( + tx.execute("CREATE TEMP TABLE current_files ( id INTEGER PRIMARY KEY NOT NULL, -- only used in this table sha256 BLOB NOT NULL, -- will become the primary key on filedir name TEXT NOT NULL, -- filename without path @@ -366,30 +425,25 @@ pub fn record( prev_key = Hash256(hashbuf); } - walk_and_insert(tx, &p, prev_key)?; // SINGLE-THREADED + walk_and_insert(tx, &p, prev_key)?; + + // Find deleted files ONLY under the paths we are given + find_deleted(tx, &p, prev_key)?; } - - insert_file_content_hashes(tx)?; // MULTI-THREADED - // SINGLE-THREADED - // Extract files from current_files, in insertion order. For each row, if it's not a - // directory compare parent column to see whether we've changed to a new parent, in which - // case we finalize this parent dir by recording its size and hash. Once a directory is - // finalized, we can update its row in current_files. The recorded time of a directory - // should be the max of all children recorded times. - let mut dirstack: Vec<(Hash256, usize, Context, f64)> = Vec::new(); + // MULTI-THREADED hash computation of all found files + insert_file_content_hashes(tx)?; - // Find latest entries in filedir_version that are not deleted, and that do not appear in - // current_files, but whose parents _do_ appear. Create new versions marking these as deleted. - - // For each entry in current_files, create a new filedir_version. + // after we've inserted all paths, prune deleted files to prevent double-recording deletions + prune_deleted(tx)?; - // Find all entries in current_files that declare parents that are not None and are not in - // current_files. Each of these parents must be added with a new version. The content_hash must - // be derived from the existing and new file versions, which should be represented in - // filedir_version at this point. Add these in depth-first order. + compute_current_directory_hashes(tx)?; + + persist_temp_tables(tx, task_uuid, u)?; // Drop the temporary current_files table + log::debug!("Dropping temp table current_files"); + tx.execute("DROP TABLE current_files", ())?; Ok(()) } diff --git a/src/main.rs b/src/main.rs index 8fd781d..781bde9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -77,10 +77,10 @@ fn init_schema(conn: &mut Connection, name: &str) -> Uuid { fn do_record(conn: &mut Connection, message: &str, paths: &[PathBuf], dataset_path: &Path) -> () { if let Err(e) = nancy::program::with_program(conn, "RECORD", message, |prog| { - prog.perform_task(&[], |_task| { + prog.perform_task(&[], |task| { // Note that this may fail, in which case we should roll back only this program // but keep the dataset initialized. - nancy::fs::record(prog.transaction, paths, &dataset_path, message) + nancy::fs::record(prog.transaction, paths, &dataset_path, message, task.key) }) }) { log::error!("Encountered error in RECORD program: {:?}", e);