Run "black" on all python code

This commit is contained in:
Jacob Hinkle 2022-09-28 12:21:49 -04:00
parent e4b380b2c1
commit b72051ff75
13 changed files with 514 additions and 416 deletions

View File

@ -29,21 +29,27 @@ def version():
@click.group(
cls=AliasedGroup,
help=f"Composable provenance tracking for scientific data")
cls=AliasedGroup, help=f"Composable provenance tracking for scientific data"
)
@click.option(
"-L", "--log_level",
"-L",
"--log_level",
# https://loguru.readthedocs.io/en/stable/api/logger.html#levels
type=click.Choice(['CRITICAL', 'ERROR', 'WARNING', 'SUCCESS', 'INFO', 'DEBUG', 'TRACE']),
default='SUCCESS',
help='If given, print all output including debugging info.',
type=click.Choice(
["CRITICAL", "ERROR", "WARNING", "SUCCESS", "INFO", "DEBUG", "TRACE"]
),
default="SUCCESS",
help="If given, print all output including debugging info.",
)
def main(log_level):
import sys
logger.remove()
logger.add(sys.stderr, level=log_level)
# main.add_command(freeze)
# main.add_command(thaw)
main.add_command(diff.diff_cli, name='diff')
main.add_command(record.record_cli, name='record')
main.add_command(diff.diff_cli, name="diff")
main.add_command(record.record_cli, name="record")
main.add_command(version)

View File

@ -1,15 +1,14 @@
def confirm(question, default_no=False):
"""Ask a question and wait for a Y/N response."""
choices = ' [y/N]: ' if default_no else ' [Y/n]: '
default_answer = 'n' if default_no else 'y'
choices = " [y/N]: " if default_no else " [Y/n]: "
default_answer = "n" if default_no else "y"
while True:
raw_reply = str(input(question + choices))
reply = raw_reply.lower().strip() or default_answer
if reply[0] == 'y':
if reply[0] == "y":
return True
elif reply[0] == 'n':
elif reply[0] == "n":
return False
else:
print("Unrecognized input:", reply)

View File

@ -9,8 +9,9 @@ import sys
import warnings
def print_diff(ABdiff: fs.FSDiff, indent=2, indent_level=0, use_color=True,
show_hashes=False):
def print_diff(
ABdiff: fs.FSDiff, indent=2, indent_level=0, use_color=True, show_hashes=False
):
"""Pretty print an FSDiff object"""
if use_color:
try:
@ -20,38 +21,41 @@ def print_diff(ABdiff: fs.FSDiff, indent=2, indent_level=0, use_color=True,
use_color = False
changetags = dict(
NEW=Fore.GREEN + 'NEW' + Style.RESET_ALL if use_color else 'NEW',
DEL=Fore.RED + 'DEL' + Style.RESET_ALL if use_color else 'DEL',
MOD=Fore.YELLOW + 'MOD' + Style.RESET_ALL if use_color else 'MOD',
NEW=Fore.GREEN + "NEW" + Style.RESET_ALL if use_color else "NEW",
DEL=Fore.RED + "DEL" + Style.RESET_ALL if use_color else "DEL",
MOD=Fore.YELLOW + "MOD" + Style.RESET_ALL if use_color else "MOD",
)
filetypecolors = dict(
DIR=Fore.BLUE if use_color else '',
REG='',
LNK=Fore.CYAN if use_color else '',
DIR=Fore.BLUE if use_color else "",
REG="",
LNK=Fore.CYAN if use_color else "",
)
reset = Style.RESET_ALL if use_color else ''
hashcolor = Fore.MAGENTA if use_color else ''
reset = Style.RESET_ALL if use_color else ""
hashcolor = Fore.MAGENTA if use_color else ""
def _print_row(tag, entry, level):
relpath = entry.relpath
# Format relpath using filetype-based colors
dname, fname = os.path.split(relpath)
if fname == '': # root directory leads to empty fname here
dirstr = (filetypecolors['DIR'] + '<root>' + reset)
if fname == "": # root directory leads to empty fname here
dirstr = filetypecolors["DIR"] + "<root>" + reset
else:
dirstr = (filetypecolors['DIR'] + dname + '/' + reset) \
if dname != '' else ''
fname = filetypecolors.get(entry.filetype, '') + fname + reset
dirstr = (
(filetypecolors["DIR"] + dname + "/" + reset) if dname != "" else ""
)
fname = filetypecolors.get(entry.filetype, "") + fname + reset
if entry.filetype == 'LNK': # append symlink target
fname += ' -> ' + entry.symlink_target
if entry.filetype == "LNK": # append symlink target
fname += " -> " + entry.symlink_target
relpath = dirstr + fname
hashchange = (hashcolor + entry.sha256.hex() + reset +
' ' + changetags[tag]) \
if show_hashes else changetags[tag]
hashchange = (
(hashcolor + entry.sha256.hex() + reset + " " + changetags[tag])
if show_hashes
else changetags[tag]
)
print(
hashchange,
@ -62,31 +66,33 @@ def print_diff(ABdiff: fs.FSDiff, indent=2, indent_level=0, use_color=True,
for l, d in ABdiff.flatten_tree():
if d.A is None:
assert d.B is not None
_print_row('NEW', d.B, l)
_print_row("NEW", d.B, l)
elif d.B is None:
_print_row('DEL', d.A, l)
_print_row("DEL", d.A, l)
elif d.A.sha256 != d.B.sha256:
_print_row('MOD', d.B, l)
_print_row("MOD", d.B, l)
@click.command()
@click.option(
'-H', "--show-hashes",
"-H",
"--show-hashes",
is_flag=True,
help='If given, prepend each line in the diff with the new file hash (SHA256).',
help="If given, prepend each line in the diff with the new file hash (SHA256).",
)
@click.option(
"--no-color",
is_flag=True,
help='If given, do not print any color output.',
help="If given, do not print any color output.",
)
@click.option(
"-s", "--store",
"-s",
"--store",
type=str,
default=None,
help='Top-level of store. If omitted, use closest common parent directory '
'of given paths. If given the path to a non-store directory, a new '
'store is initialized there.',
help="Top-level of store. If omitted, use closest common parent directory "
"of given paths. If given the path to a non-store directory, a new "
"store is initialized there.",
)
@logger.catch
def diff_cli(show_hashes, no_color, store):
@ -122,4 +128,3 @@ def diff_cli(show_hashes, no_color, store):
show_hashes=show_hashes,
use_color=not no_color,
)

View File

@ -9,9 +9,16 @@ from .diff import print_diff
import os
import sys
@logger.catch
def record(message, store_path=None, show_diff=True, show_hashes=False, use_color=True,
skip_confirm=False):
def record(
message,
store_path=None,
show_diff=True,
show_hashes=False,
use_color=True,
skip_confirm=False,
):
"""Unwrapped record command"""
if store_path is None:
@ -21,8 +28,7 @@ def record(message, store_path=None, show_diff=True, show_hashes=False, use_colo
if store_path is None: # If no store found, assume we're creating here
store_path = curdir
if not os.path.exists(os.path.join(store_path, 'nancy.db')):
if not os.path.exists(os.path.join(store_path, "nancy.db")):
# this is a new store
logger.info(f"Initializing new store in {store_path}...")
s = store.Store.init(store_path)
@ -34,7 +40,7 @@ def record(message, store_path=None, show_diff=True, show_hashes=False, use_colo
if show_diff:
print_diff(fsdiff, show_hashes=show_hashes, use_color=use_color)
logger.info('Recording with message: {}', message)
logger.info("Recording with message: {}", message)
if skip_confirm or confirm("Record the values above into the database?"):
s.record(fsdiff, message=message)
@ -45,33 +51,39 @@ def record(message, store_path=None, show_diff=True, show_hashes=False, use_colo
@click.command()
@click.option(
'-H', "--show-hashes",
"-H",
"--show-hashes",
is_flag=True,
help='If given, prepend each line in the diff with the new file hash (SHA256).',
help="If given, prepend each line in the diff with the new file hash (SHA256).",
)
@click.option(
"--no-color",
is_flag=True,
help='If given, do not print any color output.',
help="If given, do not print any color output.",
)
@click.option(
"-m", "--message",
"-m",
"--message",
type=str,
required=True,
help='A user-defined descriptive message for this recording operation.',
help="A user-defined descriptive message for this recording operation.",
)
@click.option(
"-s", "--store",
"-s",
"--store",
type=str,
default=None,
help='Top-level of store. If omitted, use closest common parent directory '
'of given paths. If given the path to a non-store directory, a new '
'store is initialized there.',
help="Top-level of store. If omitted, use closest common parent directory "
"of given paths. If given the path to a non-store directory, a new "
"store is initialized there.",
)
def record_cli(show_hashes, no_color, message, store):
"""
Initialize tracking or record changes to a tracked directory.
"""
record(message=message, show_hashes=show_hashes, use_color=not
no_color, store_path=store)
record(
message=message,
show_hashes=show_hashes,
use_color=not no_color,
store_path=store,
)

View File

@ -11,13 +11,17 @@ schema_version = 0
min_sqlite_version = (3, 24, 0)
sqlite_version = sqlite3.sqlite_version_info
sqlite_verstr = '.'.join(str(v) for v in sqlite_version)
min_sqlite_verstr = '.'.join(str(v) for v in min_sqlite_version)
sqlite_verstr = ".".join(str(v) for v in sqlite_version)
min_sqlite_verstr = ".".join(str(v) for v in min_sqlite_version)
(major, sub, minor) = sqlite_version
if major < min_sqlite_version[0] or \
sub < min_sqlite_version[1] or \
minor < min_sqlite_version[2]:
warning.warn(f"Minimum sqlite version is {min_sqlite_verstr}. Found {sqlite_verstr}")
if (
major < min_sqlite_version[0]
or sub < min_sqlite_version[1]
or minor < min_sqlite_version[2]
):
warning.warn(
f"Minimum sqlite version is {min_sqlite_verstr}. Found {sqlite_verstr}"
)
def init_schema(cur):

View File

@ -7,6 +7,7 @@ import platform
import sys
import time
class Environment(NamedTuple):
id: int
envvars_json: str
@ -26,7 +27,8 @@ class Environment(NamedTuple):
env = env._replace(user=u.id)
# insert or ignore, handle each case to set id
cur.execute('''
cur.execute(
"""
SELECT
id
FROM
@ -38,14 +40,15 @@ class Environment(NamedTuple):
python_hexversion = ? AND
user = ?
LIMIT 1
''',
""",
env[1:],
)
res = cur.fetchone()
if res is None:
cur.execute('''
cur.execute(
"""
INSERT INTO environment VALUES (?,?,?,?,?,?);
''',
""",
env,
)
id = cur.lastrowid

View File

@ -28,7 +28,9 @@ def remove_write_perms(path):
# can't stat this thing directly on this platform
# means we can only stat the content.
# In this case, we return None and do not lock this link
warnings.warn("This platform cannot stat symlinks. Will not set them read-only.")
warnings.warn(
"This platform cannot stat symlinks. Will not set them read-only."
)
return None
s = os.stat(path)
orig_perm_string = stat.filemode(s.st_mode)
@ -68,8 +70,9 @@ def make_readonly_recursive(path, excluded=[]):
@dataclass
class FSEntryVersion:
"""A version of a file or directory."""
id: int
filedir: 'FSEntry'
filedir: "FSEntry"
recorded_time: datetime # When was this version recorded?
filetype: str # One of 'LNK', 'DIR', 'REG', etc. See store.FSEntry.from_path for details
deleted: bool # set True when recording a deleted file
@ -90,19 +93,20 @@ class FSEntryVersion:
datetime.fromtimestamp(row[2]),
*row[3:-2],
bytes.fromhex(row[-2]),
row[-1]
row[-1],
)
@dataclass
class FSEntry:
"""A hashed file or directory."""
id: int # defaults to None
filename: str # with parent directory stripped. None if this is the root
relpath: str # relative to some root directory
parent: 'FSEntry' # upward link
parent: "FSEntry" # upward link
# children for dirs only: non-recursive; files/dirs at this level only
children: List['FSEntry']
children: List["FSEntry"]
filetype: str # regular, symlink, special (block, char, pipe, or socket)
deleted: bool
versions: List[FSEntryVersion] = None
@ -122,8 +126,9 @@ class FSEntry:
self.sha256 = self.latest_version.sha256
@classmethod
def from_path(cls, root, relpath=None, exclude=['nancy.db'], parent=None,
direntry=None):
def from_path(
cls, root, relpath=None, exclude=["nancy.db"], parent=None, direntry=None
):
"""
Scan a path to instantiate (recursive).
@ -140,32 +145,36 @@ class FSEntry:
else:
path = os.path.join(root, relpath)
filestat = os.lstat(path) if direntry is None else direntry.stat(follow_symlinks=False)
filestat = (
os.lstat(path) if direntry is None else direntry.stat(follow_symlinks=False)
)
s = filestat.st_mode
children = []
symlink_target = None
if os.path.islink(path):
# Check links first, since it is not exclusive with dir or file checks
filetype = 'LNK'
filetype = "LNK"
symlink_target = os.readlink(path)
m.update(bytes(symlink_target, 'utf-8'))
m.update(bytes(symlink_target, "utf-8"))
elif stat.S_ISDIR(s):
filetype = 'DIR'
filetype = "DIR"
# this prevents a directory's hash from colliding with a file hash
# in cases where it only holds a single file
if relpath is not None:
m.update(bytes(relpath, 'utf-8'))
m.update(bytes(relpath, "utf-8"))
# we use os.scandir which returns a DirEntry for each child
# excluding "." and "..". These variables hold a .stat which we can
# use to avoid having to query the filesystem twice.
direntries = list(os.scandir(path))
direntries.sort(key=operator.attrgetter('name'))
direntries.sort(key=operator.attrgetter("name"))
childrenrelpaths = ((e.name if relpath is None else
os.path.join(relpath, e.name)) for e in direntries)
childrenrelpaths = (
(e.name if relpath is None else os.path.join(relpath, e.name))
for e in direntries
)
children = [
cls.from_path(
@ -181,34 +190,34 @@ class FSEntry:
# changes without modifying the hashes of individual files,
# which remain content-based for compatibility with
# other tools
m.update(bytes(c.unfrozen_perms, 'utf-8'))
m.update(bytes(c.unfrozen_perms, "utf-8"))
m.update(c.sha256)
elif stat.S_ISREG(s):
filetype = 'REG'
m.update(open(path, 'rb').read())
filetype = "REG"
m.update(open(path, "rb").read())
elif stat.S_ISSOCK(s):
filetype = 'SOCK'
filetype = "SOCK"
elif stat.S_ISCHR(s):
filetype = 'CHR'
filetype = "CHR"
elif stat.S_ISBLK(s):
filetype = 'BLK'
filetype = "BLK"
elif stat.S_ISFIFO(s):
filetype = 'FIFO'
filetype = "FIFO"
elif stat.S_ISDOOR(s):
filetype = 'DOOR'
filetype = "DOOR"
elif stat.S_ISPORT(s):
filetype = 'PORT'
filetype = "PORT"
elif stat.S_ISWHT(s):
filetype = 'WHT'
filetype = "WHT"
else:
filetype = 'OTHER'
filetype = "OTHER"
sha256 = m.digest()
ob = cls(
id=None,
filename='.' if relpath is None else os.path.basename(relpath),
relpath='.' if relpath is None else relpath,
filename="." if relpath is None else os.path.basename(relpath),
relpath="." if relpath is None else relpath,
parent=parent,
children=children,
filetype=None,
@ -246,12 +255,12 @@ class FSEntry:
"""Just a standardized value indicating an empty root directory"""
return cls(
id=None,
filename='.',
relpath='.',
filename=".",
relpath=".",
parent=None,
children=[],
filetype='DIR',
unfrozen_perms='----------',
filetype="DIR",
unfrozen_perms="----------",
sha256=hashlib.sha256().digest(),
deleted=False,
)
@ -263,7 +272,7 @@ class FSEntry:
if root_row is None:
assert root_id is not None
cursor.execute(
'SELECT id, name, frozen FROM filedir WHERE id=?',
"SELECT id, name, frozen FROM filedir WHERE id=?",
(root_id,),
)
root_row = cursor.fetchone()
@ -285,19 +294,25 @@ class FSEntry:
versions=[],
)
cursor.execute(f'''
cursor.execute(
f"""
SELECT id, name, frozen
FROM filedir
WHERE parent=?
''', (root_id,))
""",
(root_id,),
)
rows = cursor.fetchall()
ob.children = [cls.from_db_index(cursor, root_row=r, parent=ob) for r in rows]
# get all versions
fields = ('')
cursor.execute(f'''
fields = ""
cursor.execute(
f"""
SELECT * FROM filedir_version WHERE filedir=? ORDER BY recorded_time
''', (root_id,))
""",
(root_id,),
)
matches = cursor.fetchall()
versions = [FSEntryVersion.from_row(row, filedir=ob) for row in matches]
@ -312,7 +327,6 @@ class FSEntry:
return ob
def flatten_tree(self, level=0):
"""Return list of all entries, with level, in pairs"""
pairs = [(level, self)]
@ -328,13 +342,15 @@ class FSEntry:
childsec = "[]"
else:
childstrs = [c.to_string(level=level + 1) for c in self.children]
childsep = '\n\n'
childsep = "\n\n"
childsec = childsep + childsep.join(c for c in childstrs)
# TODO: list versions in str()
# versions: [FSEntryVersion] = []
return '\n'.join((' ' * level) + l for l in f"""id: {self.id}
return "\n".join(
(" " * level) + l
for l in f"""id: {self.id}
filename: {self.filename}
relpath: {self.relpath}
parent (relpath): {'None' if self.parent is None else self.parent.relpath}
@ -344,7 +360,8 @@ unfrozen_perms: {self.unfrozen_perms}
symlink_target: {self.symlink_target}
sha256: {self.sha256.hex()}
children: {childsec}
""".splitlines())
""".splitlines()
)
def sort_diffs_filename(diffs):
@ -356,20 +373,22 @@ def sort_diffs_filename(diffs):
class FSDiff:
A: FSEntry # record the comparisons
B: FSEntry # a missing entry indicates new or deleted
modified_children: 'FSDiff'
modified_children: "FSDiff"
@staticmethod
def compare(A, B):
return A.sha256 == B.sha256 and \
A.unfrozen_perms == B.unfrozen_perms and \
A.filetype == B.filetype and \
A.deleted == B.deleted
return (
A.sha256 == B.sha256
and A.unfrozen_perms == B.unfrozen_perms
and A.filetype == B.filetype
and A.deleted == B.deleted
)
def filename(self):
return (self.B.filename if self.A is None else self.A.filename)
return self.B.filename if self.A is None else self.A.filename
def filetype(self):
return (self.B.filetype if self.A is None else self.A.filetype)
return self.B.filetype if self.A is None else self.A.filetype
@classmethod
def compute(cls, A, B):
@ -384,15 +403,23 @@ class FSDiff:
new (Directory): overlay with new entries from other
"""
if A is None: # new entry
return cls(A, B, [
cls.compute(None, c) \
return cls(
A,
B,
[
cls.compute(None, c)
for c in sorted(B.children, key=lambda e: e.filename)
])
],
)
if B is None: # deleted entry
return cls(A, B, [
cls.compute(c, None) \
return cls(
A,
B,
[
cls.compute(c, None)
for c in sorted(A.children, key=lambda e: e.filename)
])
],
)
if cls.compare(A, B):
# no need to check descendents
@ -406,10 +433,13 @@ class FSDiff:
allnames = set(list(Alist.keys()) + list(Blist.keys()))
modified_children = [cls.compute(
modified_children = [
cls.compute(
Alist.get(n, None),
Blist.get(n, None),
) for n in allnames]
)
for n in allnames
]
return cls(A, B, modified_children)

View File

@ -3,6 +3,7 @@ import json
import platform
import time
class Machine(NamedTuple):
id: int
machine_id: str
@ -23,7 +24,8 @@ class Machine(NamedTuple):
machine = cls.detect()
# insert or ignore, handle each case to set id
cur.execute('''
cur.execute(
"""
SELECT
id
FROM
@ -40,14 +42,15 @@ class Machine(NamedTuple):
win32_ver = ? AND
mac_ver = ?
LIMIT 1
''',
machine[1:]
""",
machine[1:],
)
res = cur.fetchone()
if res is None:
cur.execute('''
cur.execute(
"""
INSERT INTO machine VALUES (?,?,?,?,?,?,?,?,?,?,?);
''',
""",
machine,
)
id = cur.lastrowid
@ -64,18 +67,18 @@ class Machine(NamedTuple):
Note that 'MachineInfo' objects are properly formatted to be inserted into
the `machine` table.
"""
fdor = ''
fdor = ""
try:
fdor = json.dumps(platform.freedesktop_os_release())
except AttributeError:
# freedesktop_os_release only available for python >= 3.10
fdor = ''
fdor = ""
system = platform.system()
mid = None
if system == 'Linux':
if system == "Linux":
try:
mid = open('/etc/machine-id', 'r').read()
mid = open("/etc/machine-id", "r").read()
except FileNotFoundError:
pass
@ -92,4 +95,3 @@ class Machine(NamedTuple):
win32_ver=json.dumps(platform.win32_ver()),
mac_ver=json.dumps(platform.mac_ver()),
)

View File

@ -29,21 +29,22 @@ class Program:
cur = self.store.conn.cursor()
env = environment.Environment.find_or_insert(cur)
cur.execute('INSERT INTO program VALUES (?, ?, ?, ?, ?, ?, ?)', (
cur.execute(
"INSERT INTO program VALUES (?, ?, ?, ?, ?, ?, ?)",
(
None, # id INTEGER PRIMARY KEY NOT NULL,
self.name, # name TEXT,
# name of the program, usually written lowercase by calling
# code e.g. cnn_crossval
# -- we use POSIX timestamps for time recording.
# -- e.g. datetime.datetime.now().timestamp()
None, # start_time REAL,
None, # end_time REAL,
os.getpid(), # process_id INTEGER, -- host PID of python process on host OS
env.id, # environment INTEGER NOT NULL,
self.message, # message TEXT, -- user-defined message to help distinguish similar runs
))
),
)
self.id = cur.lastrowid
self.set_start_time(datetime.datetime.now())
@ -64,7 +65,8 @@ class Program:
# record start and end times in store
cur = self.store.conn.cursor()
cur.execute('''
cur.execute(
"""
UPDATE
program
SET
@ -72,13 +74,15 @@ class Program:
end_time = ?
WHERE
id = ?
''',
""",
(self.start_time, end_time, self.id),
)
cur.connection.commit()
self._evaluated = True # prevent re-running
elapsed = end_time - self.start_time
logger.success(f"Program [{self.id}] {self.name} (message:{self.message}) ran in {elapsed} seconds.")
logger.success(
f"Program [{self.id}] {self.name} (message:{self.message}) ran in {elapsed} seconds."
)
class Store:
@ -93,7 +97,7 @@ class Store:
"""
if directory is None:
self.path = None
self.db_path = ':memory:'
self.db_path = ":memory:"
else:
self.path = Path(directory)
self.db_path = self.path / "nancy.db"
@ -119,13 +123,13 @@ class Store:
def init(cls, directory=None, message=None):
start_time = datetime.datetime.now()
if directory is None: # initialize an in-memory store
db_path = ':memory:'
db_path = ":memory:"
else:
if not os.path.isdir(directory):
raise FileNotFoundError(
f"Directory {directory} must exist before initializing a store there.",
)
db_path = os.path.join(directory, 'nancy.db')
db_path = os.path.join(directory, "nancy.db")
if os.path.isfile(db_path):
raise FileExistsError(
f"File {db_path} exists. Refusing to re-initialize",
@ -135,7 +139,7 @@ class Store:
db.init_schema(conn.cursor())
new_store = cls(directory, conn)
with new_store.program('INIT', message) as p:
with new_store.program("INIT", message) as p:
# set the timing to the actual times it took to initialize the db
p.set_start_time(start_time)
@ -143,16 +147,16 @@ class Store:
def make_readonly(self):
"""Make store directory read-only (except for nancy.db) and return file list"""
fs.make_readonly_recursive(self.path, excluded='./nancy.db')
fs.make_readonly_recursive(self.path, excluded="./nancy.db")
def filedir_root_index(self, cur=None):
"""Get the database id for the table entry in this store having name '.'"""
if cur is None:
cur = self.conn.cursor()
cur.execute('SELECT * FROM filedir')
cur.execute("SELECT * FROM filedir")
allfiledir = cur.fetchall()
cur.execute('SELECT id FROM filedir WHERE store=1 AND parent is NULL')
root_id, = cur.fetchone()
cur.execute("SELECT id FROM filedir WHERE store=1 AND parent is NULL")
(root_id,) = cur.fetchone()
return root_id
def path_to_fsentry(self, path):
@ -174,7 +178,7 @@ class Store:
for p in Path(rel).parts: # Path.parts splits a path reliably
# get child with that name
cur.execute(
'SELECT id, filetype FROM filedir WHERE filename=? AND parent=? LIMIT 1',
"SELECT id, filetype FROM filedir WHERE filename=? AND parent=? LIMIT 1",
(p, fd_id),
)
row = cur.fetchone()
@ -183,7 +187,7 @@ class Store:
return None
fd_id, filetype = row
if filetype != 'DIR':
if filetype != "DIR":
return fd_id
return fs.FSEntry.from_db_index(cur, root_id=fd_id)
@ -210,10 +214,9 @@ class Store:
return fs.FSDiff.compute(recorded, current)
def _record_file_version(self, cur, ob, filedir_id, source_task=None):
cur.execute(
'INSERT INTO filedir_version VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
"INSERT INTO filedir_version VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
None,
filedir_id,
@ -224,7 +227,7 @@ class Store:
ob.symlink_target,
ob.sha256.hex(),
source_task,
)
),
)
return cur.lastrowid
@ -238,17 +241,18 @@ class Store:
if len(res) == 0:
# create filedir entry and get its id
cur.execute(
'INSERT INTO filedir VALUES (?, ?, ?, ?, ?)',
"INSERT INTO filedir VALUES (?, ?, ?, ?, ?)",
(
None,
1,
ob.filename,
parent_id,
False,
))
),
)
thisid = cur.lastrowid
else:
thisid, = res[0]
(thisid,) = res[0]
self._record_file_version(cur, ob, thisid, source_task=source_task)
@ -256,31 +260,30 @@ class Store:
for c in ob.children:
self._record_new_file_recursive(c, cur, thisid, source_task)
def _record_recursive(self, diff, cur, parent_id=None, source_task=None):
"""Record this level of a diff."""
if diff.A is None:
self._record_new_file_recursive(diff.B, cur, parent_id,
source_task=source_task)
self._record_new_file_recursive(
diff.B, cur, parent_id, source_task=source_task
)
elif diff.B is None:
self._record_deleted_file_recursive(diff.B, cur, parent_id)
else:
# possibly modified, record new version then recurse into children
self._record_new_file_recursive(diff.B, cur, parent_id,
source_task=source_task)
self._record_file_version(cur, diff.B, diff.A.id,
source_task=source_task)
self._record_new_file_recursive(
diff.B, cur, parent_id, source_task=source_task
)
self._record_file_version(cur, diff.B, diff.A.id, source_task=source_task)
# descend into children
def record(self, diff, parent_id=None, message=None, cur=None):
if cur is None:
cur = self.conn.cursor()
with self.program('RECORD', message) as p:
with self.program("RECORD", message) as p:
# create a task for this operation
task_id = p.new_task('Store._record_recursive')
task_id = p.new_task("Store._record_recursive")
# descend the diff, tracking parent filedir IDs, creating them and
# recording new versions of each, when necessary

View File

@ -5,6 +5,7 @@ import os
import pwd
from typing import NamedTuple
class User(NamedTuple):
id: int # if not None, this is `id` in the `machine` table
username: str
@ -23,7 +24,8 @@ class User(NamedTuple):
user = user._replace(machine=m.id)
# insert or ignore, handle each case to set id
cur.execute('''
cur.execute(
"""
SELECT
id
FROM
@ -34,14 +36,15 @@ class User(NamedTuple):
fullname = ? AND
machine = ?
LIMIT 1
''',
""",
user[1:],
)
res = cur.fetchone()
if res is None:
cur.execute('''
cur.execute(
"""
INSERT INTO user VALUES (?,?,?,?,?);
''',
""",
user,
)
id = cur.lastrowid
@ -69,4 +72,3 @@ class User(NamedTuple):
fullname,
m.id,
)

View File

@ -1,4 +1,3 @@
from .db import schema_version
__version__ = "0.1.0"

View File

@ -11,10 +11,11 @@ import sqlite3
@pytest.fixture
def temp_db():
"""Create an in-memory database that follow's the nancy schema"""
with sqlite3.connect(':memory:') as conn:
with sqlite3.connect(":memory:") as conn:
cur = conn.cursor()
from nancy import db
db.init_schema(cur)
yield cur
@ -24,99 +25,108 @@ def temp_db():
def insert_machine(temp_db):
cur = temp_db
cur.executemany(
'INSERT INTO machine VALUES '
'(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)',
[(
"INSERT INTO machine VALUES " "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
[
(
None, # id INTEGER PRIMARY KEY NOT NULL,
'a5d97c08a15c4db69f5fded523a1bfe3', #machine_id TEXT, -- platform-dependent unique hardware id
'lucky', #hostname TEXT, -- platform.node(): 'lucky'
'', #processor TEXT, -- platform.processor():
'Linux', #system TEXT, -- platform.system(): 'Linux'
'5.15.64', #release TEXT, -- platform.release(): '5.15.64'
'aarch64', #machine TEXT, -- platform.machine(): 'x86_64'
'EDT', #timezone TEXT, -- timezone, for interpreting event times
'', #freedesktop_os_release TEXT, -- requires python 3.10
'', #win32_ver TEXT, -- platform.win32_ver() as JSON
'', #mac_ver TEXT -- platform.mac_ver() as JSON
), (
"a5d97c08a15c4db69f5fded523a1bfe3", # machine_id TEXT, -- platform-dependent unique hardware id
"lucky", # hostname TEXT, -- platform.node(): 'lucky'
"", # processor TEXT, -- platform.processor():
"Linux", # system TEXT, -- platform.system(): 'Linux'
"5.15.64", # release TEXT, -- platform.release(): '5.15.64'
"aarch64", # machine TEXT, -- platform.machine(): 'x86_64'
"EDT", # timezone TEXT, -- timezone, for interpreting event times
"", # freedesktop_os_release TEXT, -- requires python 3.10
"", # win32_ver TEXT, -- platform.win32_ver() as JSON
"", # mac_ver TEXT -- platform.mac_ver() as JSON
),
(
None, # id INTEGER PRIMARY KEY NOT NULL,
'afc9b06a23b74341b29d42b8312a4f8a',
'a100', #hostname TEXT, -- platform.node(): 'lucky'
'', #processor TEXT, -- platform.processor():
'Linux', #system TEXT, -- platform.system(): 'Linux'
'5.15.63', #release TEXT, -- platform.release(): '5.15.64'
'x86_64', #machine TEXT, -- platform.machine(): 'x86_64'
'EST', #timezone TEXT, -- timezone, for interpreting event times
'', #freedesktop_os_release TEXT, -- requires python 3.10
'', #win32_ver TEXT, -- platform.win32_ver() as JSON
'', #mac_ver TEXT -- platform.mac_ver() as JSON
)],
"afc9b06a23b74341b29d42b8312a4f8a",
"a100", # hostname TEXT, -- platform.node(): 'lucky'
"", # processor TEXT, -- platform.processor():
"Linux", # system TEXT, -- platform.system(): 'Linux'
"5.15.63", # release TEXT, -- platform.release(): '5.15.64'
"x86_64", # machine TEXT, -- platform.machine(): 'x86_64'
"EST", # timezone TEXT, -- timezone, for interpreting event times
"", # freedesktop_os_release TEXT, -- requires python 3.10
"", # win32_ver TEXT, -- platform.win32_ver() as JSON
"", # mac_ver TEXT -- platform.mac_ver() as JSON
),
],
)
return cur
def test_insert_machine(insert_machine):
cur = insert_machine
cur.execute('SELECT * FROM machine')
cur.execute("SELECT * FROM machine")
machines = cur.fetchall()
assert len(machines) == 2
@pytest.fixture
def insert_user(insert_machine):
cur = insert_machine
cur.executemany(
'INSERT INTO user VALUES '
'(?, ?, ?, ?, ?)',
[(
"INSERT INTO user VALUES " "(?, ?, ?, ?, ?)",
[
(
None, # id INTEGER PRIMARY KEY NOT NULL,
'jacob', #username TEXT NOT NULL,
"jacob", # username TEXT NOT NULL,
101, # userid INTEGER,
'Jacob Hinkle', #fullname TEXT,
"Jacob Hinkle", # fullname TEXT,
1, # machine INTEGER NOT NULL,
), (
),
(
None, # id INTEGER PRIMARY KEY NOT NULL,
'jacob', #username TEXT NOT NULL,
"jacob", # username TEXT NOT NULL,
10301, # userid INTEGER,
'Jacob Hinkle', #fullname TEXT,
"Jacob Hinkle", # fullname TEXT,
2, # machine INTEGER NOT NULL,
), (
),
(
None, # id INTEGER PRIMARY KEY NOT NULL,
'bob', #username TEXT NOT NULL,
"bob", # username TEXT NOT NULL,
2035, # userid INTEGER,
'Just Bob', #fullname TEXT,
"Just Bob", # fullname TEXT,
2, # machine INTEGER NOT NULL,
)],
),
],
)
return cur
def test_insert_user(insert_user):
cur = insert_user
cur.execute('SELECT * FROM user')
cur.execute("SELECT * FROM user")
users = cur.fetchall()
assert len(users) == 3
def test_invalid_user_machine(insert_user):
cur = insert_user
with pytest.raises(sqlite3.IntegrityError):
# should fail foreign key constraint
cur.execute(
'INSERT INTO user VALUES '
'(?, ?, ?, ?, ?)',
"INSERT INTO user VALUES " "(?, ?, ?, ?, ?)",
(
None, # id INTEGER PRIMARY KEY NOT NULL,
'bozo', #username TEXT NOT NULL,
"bozo", # username TEXT NOT NULL,
100, # userid INTEGER,
'Bozo the Clown', #fullname TEXT,
"Bozo the Clown", # fullname TEXT,
3, # machine INTEGER NOT NULL,
),
)
with pytest.raises(sqlite3.IntegrityError):
# should fail uniqueness constraint
cur.execute(
'INSERT INTO user VALUES '
'(?, ?, ?, ?, ?)',
"INSERT INTO user VALUES " "(?, ?, ?, ?, ?)",
(
None, # id INTEGER PRIMARY KEY NOT NULL,
'jacob', #username TEXT NOT NULL,
"jacob", # username TEXT NOT NULL,
101, # userid INTEGER,
'Jacob Hinkle', #fullname TEXT,
"Jacob Hinkle", # fullname TEXT,
1, # machine INTEGER NOT NULL,
),
)
@ -125,174 +135,196 @@ def test_invalid_user_machine(insert_user):
@pytest.fixture
def insert_store(insert_machine):
import uuid
cur = insert_machine
cur.executemany(
'INSERT INTO store VALUES '
'(?, ?, ?, ?, ?)',
[(
"INSERT INTO store VALUES " "(?, ?, ?, ?, ?)",
[
(
None, # id INTEGER PRIMARY KEY NOT NULL,
1, # machine INTEGER,
'/path/to/first/store', #dbpath TEXT NOT NULL,
"/path/to/first/store", # dbpath TEXT NOT NULL,
str(uuid.uuid4()), # -- UUID generated by str(uuid.uuid4())
False, # imported BOOL,
), (
),
(
None, # id INTEGER PRIMARY KEY NOT NULL,
1, # machine INTEGER,
'/path/to/dependency/store', #dbpath TEXT NOT NULL,
"/path/to/dependency/store", # dbpath TEXT NOT NULL,
str(uuid.uuid4()), # -- UUID generated by str(uuid.uuid4())
True, # imported BOOL,
), (
),
(
None, # id INTEGER PRIMARY KEY NOT NULL,
2, # machine INTEGER,
# same path but on a separate machine
'/path/to/first/store', #dbpath TEXT NOT NULL,
"/path/to/first/store", # dbpath TEXT NOT NULL,
str(uuid.uuid4()), # -- UUID generated by str(uuid.uuid4())
True, # imported BOOL,
)],
),
],
)
return cur
@pytest.fixture
def insert_directories(insert_store):
cur = insert_store
cur.executemany(
'INSERT INTO filedir VALUES '
'(?, ?, ?, ?, ?)',
[(
"INSERT INTO filedir VALUES " "(?, ?, ?, ?, ?)",
[
(
None, # id INTEGER PRIMARY KEY NOT NULL,
1, # store INTEGER NOT NULL,
'.', #filename TEXT, -- only a filename, not a path
".", # filename TEXT, -- only a filename, not a path
None, # parent INTEGER REFERENCES filedir ON UPDATE CASCADE,
False, # frozen BOOL NOT NULL,
), (
),
(
None, # id INTEGER PRIMARY KEY NOT NULL,
1, # store INTEGER NOT NULL,
'foo', #filename TEXT, -- only a filename, not a path
"foo", # filename TEXT, -- only a filename, not a path
1, # parent INTEGER REFERENCES filedir ON UPDATE CASCADE,
False, # frozen BOOL NOT NULL,
), (
),
(
None, # id INTEGER PRIMARY KEY NOT NULL,
2, # store INTEGER NOT NULL,
'.', #filename TEXT, -- only a filename, not a path
".", # filename TEXT, -- only a filename, not a path
None, # parent INTEGER REFERENCES filedir ON UPDATE CASCADE,
False, # frozen BOOL NOT NULL,
)],
),
],
)
cur.executemany(
'INSERT INTO filedir_version VALUES '
'(?, ?, ?, ?, ?, ?, ?, ?, ?)',
[(
"INSERT INTO filedir_version VALUES " "(?, ?, ?, ?, ?, ?, ?, ?, ?)",
[
(
None, # id INTEGER PRIMARY KEY NOT NULL,
1, # INTEGER REFERENCES filedir ON UPDATE CASCADE, -- parent filedir entry
datetime.datetime.now().timestamp(),
'DIR', #filetype TEXT, -- One of 'LNK', 'DIR', 'REG', etc. See store.FSEntry.from_path for details
"DIR", # filetype TEXT, -- One of 'LNK', 'DIR', 'REG', etc. See store.FSEntry.from_path for details
False, # deleted BOOL NOT NULL, -- set True when recording a deleted file
'drwxrwxr-x', #unfrozen_perms TEXT, -- stat.filemode(os.stat(path).st_mode): '-rw-rw-r--'
"drwxrwxr-x", # unfrozen_perms TEXT, -- stat.filemode(os.stat(path).st_mode): '-rw-rw-r--'
None, # symlink_target TEXT, -- if this is a symlink, this is the (read but not fully resolved) target. I.e. this is the "content" of the symlink.
'a84ed33864d06615a87bc8da5258d841163f1e7969367ecd07b041ae1a18febd', #sha256 TEXT,
"a84ed33864d06615a87bc8da5258d841163f1e7969367ecd07b041ae1a18febd", # sha256 TEXT,
None, # source_task INTEGER,
), (
),
(
None, # id INTEGER PRIMARY KEY NOT NULL,
1, # INTEGER REFERENCES filedir ON UPDATE CASCADE, -- parent filedir entry
datetime.datetime.now().timestamp(),
'DIR', #filetype TEXT, -- One of 'LNK', 'DIR', 'REG', etc. See store.FSEntry.from_path for details
"DIR", # filetype TEXT, -- One of 'LNK', 'DIR', 'REG', etc. See store.FSEntry.from_path for details
False, # deleted BOOL NOT NULL, -- set True when recording a deleted file
'drwxrwxr-x', #unfrozen_perms TEXT, -- stat.filemode(os.stat(path).st_mode): '-rw-rw-r--'
"drwxrwxr-x", # unfrozen_perms TEXT, -- stat.filemode(os.stat(path).st_mode): '-rw-rw-r--'
None, # symlink_target TEXT, -- if this is a symlink, this is the (read but not fully resolved) target. I.e. this is the "content" of the symlink.
'a84ed33864d06615a87bc8da5258d841163f1e7969367ecd07b041ae1a18febd', #sha256 TEXT,
"a84ed33864d06615a87bc8da5258d841163f1e7969367ecd07b041ae1a18febd", # sha256 TEXT,
None, # source_task INTEGER,
), (
),
(
None, # id INTEGER PRIMARY KEY NOT NULL,
1, # INTEGER REFERENCES filedir ON UPDATE CASCADE, -- parent filedir entry
datetime.datetime.now().timestamp(),
'DIR', #filetype TEXT, -- One of 'LNK', 'DIR', 'REG', etc. See store.FSEntry.from_path for details
"DIR", # filetype TEXT, -- One of 'LNK', 'DIR', 'REG', etc. See store.FSEntry.from_path for details
False, # deleted BOOL NOT NULL, -- set True when recording a deleted file
'drwxrwxr-x', #unfrozen_perms TEXT, -- stat.filemode(os.stat(path).st_mode): '-rw-rw-r--'
"drwxrwxr-x", # unfrozen_perms TEXT, -- stat.filemode(os.stat(path).st_mode): '-rw-rw-r--'
None, # symlink_target TEXT, -- if this is a symlink, this is the (read but not fully resolved) target. I.e. this is the "content" of the symlink.
'a84ed33864d06615a87bc8da5258d841163f1e7969367ecd07b041ae1a18febd', #sha256 TEXT,
"a84ed33864d06615a87bc8da5258d841163f1e7969367ecd07b041ae1a18febd", # sha256 TEXT,
None, # source_task INTEGER,
)],
),
],
)
return cur
def test_crossstore_directory_insert(insert_directories):
cur = insert_directories
with pytest.raises(sqlite3.IntegrityError):
# declaring directory as belonging to store 2, but parent's store is 1
cur.execute(
'INSERT INTO filedir VALUES '
'(?, ?, ?, ?, ?)',
"INSERT INTO filedir VALUES " "(?, ?, ?, ?, ?)",
(
None, # id INTEGER PRIMARY KEY NOT NULL,
2, # store INTEGER NOT NULL,
'some_dir', #filename TEXT, -- only a filename, not a path
"some_dir", # filename TEXT, -- only a filename, not a path
1, # parent INTEGER REFERENCES filedir ON UPDATE CASCADE,
False, # frozen BOOL NOT NULL,
))
),
)
for row in cur.connection.iterdump():
print(row)
cur.execute('SELECT * FROM filedir')
cur.execute("SELECT * FROM filedir")
print(cur.fetchall())
@pytest.fixture
def insert_files(insert_directories):
cur = insert_directories
cur.execute('SELECT COUNT(*) FROM filedir')
nprev, = cur.fetchone()
cur.execute("SELECT COUNT(*) FROM filedir")
(nprev,) = cur.fetchone()
cur.executemany(
'INSERT INTO filedir VALUES '
'(?, ?, ?, ?, ?)',
[(
"INSERT INTO filedir VALUES " "(?, ?, ?, ?, ?)",
[
(
None, # id INTEGER PRIMARY KEY NOT NULL,
1, # store INTEGER NOT NULL,
'example.csv', #filename TEXT, -- only a filename, not a path
"example.csv", # filename TEXT, -- only a filename, not a path
1, # parent INTEGER REFERENCES filedir ON UPDATE CASCADE,
False, # frozen BOOL NOT NULL,
), (
),
(
None, # id INTEGER PRIMARY KEY NOT NULL,
1, # store INTEGER NOT NULL,
'plots.png', #filename TEXT, -- only a filename, not a path
"plots.png", # filename TEXT, -- only a filename, not a path
2, # parent INTEGER REFERENCES filedir ON UPDATE CASCADE,
False, # frozen BOOL NOT NULL,
)]
),
],
)
cur.executemany(
'INSERT INTO filedir_version VALUES '
'(?, ?, ?, ?, ?, ?, ?, ?, ?)',
[(
"INSERT INTO filedir_version VALUES " "(?, ?, ?, ?, ?, ?, ?, ?, ?)",
[
(
None, # id INTEGER PRIMARY KEY NOT NULL,
nprev + 1, # INTEGER REFERENCES filedir ON UPDATE CASCADE, -- parent filedir entry
nprev
+ 1, # INTEGER REFERENCES filedir ON UPDATE CASCADE, -- parent filedir entry
datetime.datetime.now().timestamp(),
'REG', #filetype TEXT, -- One of 'LNK', 'DIR', 'REG', etc. See store.FSEntry.from_path for details
"REG", # filetype TEXT, -- One of 'LNK', 'DIR', 'REG', etc. See store.FSEntry.from_path for details
False, # deleted BOOL NOT NULL, -- set True when recording a deleted file
'drwxrwxr-x', #unfrozen_perms TEXT, -- stat.filemode(os.stat(path).st_mode): '-rw-rw-r--'
"drwxrwxr-x", # unfrozen_perms TEXT, -- stat.filemode(os.stat(path).st_mode): '-rw-rw-r--'
None, # symlink_target TEXT, -- if this is a symlink, this is the (read but not fully resolved) target. I.e. this is the "content" of the symlink.
'a84ed33864d06615a87bc8da5258d841163f1e7969367ecd07b041ae1a18febd', #sha256 TEXT,
"a84ed33864d06615a87bc8da5258d841163f1e7969367ecd07b041ae1a18febd", # sha256 TEXT,
None, # source_task INTEGER,
), ( # second version of first file
),
( # second version of first file
None, # id INTEGER PRIMARY KEY NOT NULL,
nprev + 1, # INTEGER REFERENCES filedir ON UPDATE CASCADE, -- parent filedir entry
nprev
+ 1, # INTEGER REFERENCES filedir ON UPDATE CASCADE, -- parent filedir entry
datetime.datetime.now().timestamp(),
'REG', #filetype TEXT, -- One of 'LNK', 'DIR', 'REG', etc. See store.FSEntry.from_path for details
"REG", # filetype TEXT, -- One of 'LNK', 'DIR', 'REG', etc. See store.FSEntry.from_path for details
False, # deleted BOOL NOT NULL, -- set True when recording a deleted file
'drwxr-xr-x', #unfrozen_perms TEXT, -- stat.filemode(os.stat(path).st_mode): '-rw-rw-r--'
"drwxr-xr-x", # unfrozen_perms TEXT, -- stat.filemode(os.stat(path).st_mode): '-rw-rw-r--'
None, # symlink_target TEXT, -- if this is a symlink, this is the (read but not fully resolved) target. I.e. this is the "content" of the symlink.
'a94ed33864d06615a87bc8da5258d841163f1e7969367ecd07b041ae1a18febd', #sha256 TEXT,
"a94ed33864d06615a87bc8da5258d841163f1e7969367ecd07b041ae1a18febd", # sha256 TEXT,
None, # source_task INTEGER,
), (
),
(
None, # id INTEGER PRIMARY KEY NOT NULL,
nprev + 2, # INTEGER REFERENCES filedir ON UPDATE CASCADE, -- parent filedir entry
nprev
+ 2, # INTEGER REFERENCES filedir ON UPDATE CASCADE, -- parent filedir entry
datetime.datetime.now().timestamp(),
'REG', #filetype TEXT, -- One of 'LNK', 'DIR', 'REG', etc. See store.FSEntry.from_path for details
"REG", # filetype TEXT, -- One of 'LNK', 'DIR', 'REG', etc. See store.FSEntry.from_path for details
False, # deleted BOOL NOT NULL, -- set True when recording a deleted file
'drwxr-xr-x', #unfrozen_perms TEXT, -- stat.filemode(os.stat(path).st_mode): '-rw-rw-r--'
"drwxr-xr-x", # unfrozen_perms TEXT, -- stat.filemode(os.stat(path).st_mode): '-rw-rw-r--'
None, # symlink_target TEXT, -- if this is a symlink, this is the (read but not fully resolved) target. I.e. this is the "content" of the symlink.
'a94ed33864d06615a87bc8da5258d841163f1e7969367ecd07b041ae1a18febd', #sha256 TEXT,
"a94ed33864d06615a87bc8da5258d841163f1e7969367ecd07b041ae1a18febd", # sha256 TEXT,
None, # source_task INTEGER,
)])
),
],
)
return cur
# TODO: This test is disabled until triggers are added to check for these types
# of constraints. These became much more complicated to check when I added
# filedir_version.
@ -301,16 +333,16 @@ def disabled_test_nondir_parent_directory_insert(insert_files):
with pytest.raises(sqlite3.IntegrityError):
# declaring parent as 5, but 5 is a file (plots.png)
cur.execute(
'INSERT INTO filedir VALUES '
'(?, ?, ?, ?, ?)',
"INSERT INTO filedir VALUES " "(?, ?, ?, ?, ?)",
(
None, # id INTEGER PRIMARY KEY NOT NULL,
1, # store INTEGER NOT NULL,
'some_filedir.txt', #filename TEXT, -- only a filename, not a path
"some_filedir.txt", # filename TEXT, -- only a filename, not a path
5, # parent INTEGER REFERENCES filedir ON UPDATE CASCADE,
False, # frozen BOOL NOT NULL,
))
),
)
for row in cur.connection.iterdump():
print(row)
cur.execute('SELECT * FROM filedir')
cur.execute("SELECT * FROM filedir")
print(cur.fetchall())

View File

@ -4,25 +4,28 @@ import pytest
import sys
import tempfile
@pytest.fixture
def bare_dir():
"""Create an emptry temp directory"""
with tempfile.TemporaryDirectory(prefix="nancy_testdir") as d:
yield Path(d)
@pytest.fixture
def filled_dir(bare_dir):
open(bare_dir / 'a.txt', 'w').write("foo")
os.makedirs(bare_dir / 'stats')
open(bare_dir / 'stats' / 'metrics.csv', 'w').write("bar,baz")
open(bare_dir / "a.txt", "w").write("foo")
os.makedirs(bare_dir / "stats")
open(bare_dir / "stats" / "metrics.csv", "w").write("bar,baz")
# identical to ./a.txt
open(bare_dir / 'stats' / 'a.txt', 'w').write("foo")
open(bare_dir / "stats" / "a.txt", "w").write("foo")
return bare_dir
def test_record_untracked_dir(filled_dir):
from nancy.cli.record import record
record(filled_dir, message='test_record_untracked_dir')
record(filled_dir, message="test_record_untracked_dir")
@pytest.fixture
@ -33,7 +36,6 @@ def store():
yield s
def test_schema_version_match(store):
from nancy.version import schema_version
@ -41,4 +43,3 @@ def test_schema_version_match(store):
(db_schema_ver,) = cur.execute("PRAGMA user_version;").fetchone()
assert schema_version == db_schema_ver