|
- #!/usr/bin/env python
-
- import stat
- import sys
- import logging
-
- # useful for debugging when stderr is redirected/captured
- _real_stderr = sys.stderr
- _sql_verbose = False
-
- if False:
- lvl = logging.DEBUG
- lvl = logging.INFO
- _handler = logging.StreamHandler(_real_stderr)
- _handler.setLevel(lvl)
- _handler.setFormatter(logging.Formatter('%(asctime)s: %(message)s'))
- import sqlalchemy
- logging.getLogger('sqlalchemy').addHandler(_handler)
- logging.getLogger('sqlalchemy.engine').setLevel(lvl)
-
- from .utils import _debprint
-
- #import pdb, sys; mypdb = pdb.Pdb(stdout=sys.stderr); mypdb.set_trace()
-
- from edgold.ed448 import EDDSA448
- from unittest import mock
-
- from .hostid import hostuuid
- from .tags import TagCache
- from . import orm
- from .magic_wrap import detect_from_filename
-
- from .btv import _TestCases as bttestcase, validate_file
-
- import base64
- import base58
- from .btv import bencode
- import copy
- import datetime
- import functools
- import hashlib
- import importlib
- import io
- import itertools
- import json
- import libarchive
- import magic
- import os.path
- import pathlib
- import pasn1
- import re
- import shutil
- import socket
- import sqlalchemy
- from sqlalchemy import create_engine, select, insert, func, delete
- from sqlalchemy.orm import sessionmaker
- import string
- import subprocess
- import sys
- import tempfile
- import unittest
- import uuid
-
- # The UUID for the namespace representing the path to a file
- _NAMESPACE_MEDASHARE_PATH = uuid.UUID('f6f36b62-3770-4a68-bc3d-dc3e31e429e6')
- _NAMESPACE_MEDASHARE_CONTAINER = uuid.UUID('890a9d5c-0626-4de1-ab05-9e14947391eb')
-
- _defaulthash = 'sha512'
- _validhashes = set([ 'sha256', 'sha512' ])
- _hashlengths = { len(getattr(hashlib, x)().hexdigest()): x for x in
- _validhashes }
-
- def _makehashuri(hashstr):
- hash, value = ObjectStore.makehash(hashstr).split(':')
-
- return f'hash://{hash}/{value}'
-
- def _keyordering(x):
- k, v = x
- try:
- return (MDBase._common_names_list.index(k), k, v)
- except ValueError:
- return (2**32, k, v)
-
- def _iterdictlist(obj, **kwargs):
- l = list(sorted(obj.items(**kwargs), key=_keyordering))
- for k, v in l:
- if isinstance(v, list):
- for i in sorted(v):
- yield k, i
- else:
- yield k, v
-
- from .utils import _makeuuid, _makedatetime, _asn1coder
-
- from .mdb import MDBase
-
- class MetaData(MDBase):
- _type = 'metadata'
-
- _uniq_properties = set([ 'ms:tag' ])
-
- class Identity(MDBase):
- _type = 'identity'
-
- # Identites don't need a created by
- _common_properties = [ x for x in MDBase._common_properties if x !=
- 'created_by_ref' ]
- _common_optional = set([ x for x in MDBase._common_optional if x !=
- 'parent_refs' ] + [ 'name', 'pubkey' ])
- _common_names = set(_common_properties + list(
- MDBase._generated_properties.keys()))
-
- class Persona(object):
- '''The object that represents a persona, or identity. It will
- create the proper identity object, serialize for saving keys,
- create objects for that persona and other management.'''
-
- def __init__(self, identity=None, key=None):
- if identity is None:
- self._identity = Identity()
- else:
- self._identity = identity
-
- self._key = key
- self._pubkey = None
-
- if 'pubkey' in self._identity:
- pubkeybytes = self._identity.pubkey
- self._pubkey = EDDSA448(pub=pubkeybytes)
-
- self._created_by_ref = self._identity.uuid
-
- def Host(self, *args, **kwargs):
- kwargs['created_by_ref'] = self.uuid
-
- return self.sign(Host(*args, **kwargs))
-
- def Mapping(self, *args, **kwargs):
- kwargs['created_by_ref'] = self.uuid
-
- return self.sign(Mapping(*args, **kwargs))
-
- def Container(self, *args, **kwargs):
- kwargs['created_by_ref'] = self.uuid
-
- return self.sign(Container(*args, **kwargs))
-
- def MetaData(self, *args, **kwargs):
- kwargs['created_by_ref'] = self.uuid
-
- return self.sign(MetaData(*args, **kwargs))
-
- @property
- def uuid(self):
- '''Return the UUID of the identity represented.'''
-
- return self._identity.uuid
-
- def __repr__(self): # pragma: no cover
- r = '<Persona: has key: %s, has pubkey: %s, identity: %s>' % \
- (self._key is not None, self._pubkey is not None,
- repr(self._identity))
-
- return r
-
- @classmethod
- def from_pubkey(cls, pubkeystr):
- pubstr = base58.b58decode_check(pubkeystr)
-
- uuid, pubkey = _asn1coder.loads(pubstr)
-
- ident = Identity(uuid=uuid, pubkey=pubkey)
-
- return cls(ident)
-
- def get_identity(self):
- '''Return the Identity object for this Persona.'''
-
- return self._identity
-
- def get_pubkey(self):
- '''Get a printable version of the public key. This is used
- for importing into different programs, or for shared.'''
-
- idobj = self._identity
- pubstr = _asn1coder.dumps([ idobj.uuid, idobj.pubkey ])
-
- return base58.b58encode_check(pubstr).decode('ascii')
-
- def new_version(self, *args):
- '''Update the Persona's Identity object.'''
-
- self._identity = self.sign(self._identity.new_version(*args))
-
- return self._identity
-
- def store(self, fname):
- '''Store the Persona to a file. If there is a private
- key associated w/ the Persona, it will be saved as well.'''
-
- with open(fname, 'wb') as fp:
- obj = {
- 'identity': self._identity,
- }
- if self._key is not None:
- obj['key'] = \
- self._key.export_key('raw')
-
- fp.write(_asn1coder.dumps(obj))
-
- @classmethod
- def load(cls, fname):
- '''Load the Persona from the provided file.'''
-
- with open(fname, 'rb') as fp:
- objs = _asn1coder.loads(fp.read())
-
- kwargs = {}
- if 'key' in objs:
- kwargs['key'] = EDDSA448(objs['key'])
-
- return cls(Identity(objs['identity']), **kwargs)
-
- def generate_key(self):
- '''Generate a key for this Identity.
-
- Raises a RuntimeError if a key is already present.'''
-
- if self._key:
- raise RuntimeError('a key already exists')
-
- self._key = EDDSA448.generate()
- self._pubkey = self._key.public_key()
- pubkey = self._pubkey.export_key('raw')
- self._identity = self.sign(self._identity.new_version(('pubkey',
- pubkey)))
-
- def _makesigbytes(self, obj):
- obj = dict(obj.items(False))
- try:
- del obj['sig']
- except KeyError:
- pass
-
- return _asn1coder.dumps(obj)
-
- def sign(self, obj):
- '''Takes the object, adds a signature, and returns the new
- object.'''
-
- sigbytes = self._makesigbytes(obj)
- sig = self._key.sign(sigbytes)
- newobj = MDBase.create_obj(obj)
- newobj.sig = sig
-
- return newobj
-
- def verify(self, obj):
- sigbytes = self._makesigbytes(obj)
-
- pubkey = self._pubkey.export_key('raw')
- self._pubkey.verify(obj['sig'], sigbytes)
-
- return True
-
- def by_file(self, fname):
- '''Return a file object for the file named fname.'''
-
- fobj = FileObject.from_file(fname, self._created_by_ref)
-
- return self.sign(fobj)
-
- class ObjectStore(object):
- '''A container to store the various MetaData objects.'''
-
- # The _uuids property contains both the UUIDv4 for objects, and
- # looking up the UUIDv5 for FileObjects.
-
- def __init__(self, engine, version='head'):
- #orm.Base.metadata.create_all(engine)
-
- self._engine = engine
- self._ses = sessionmaker(engine)
-
- self._handle_migration(version)
-
- def _handle_migration(self, version):
- '''Handle migrating the database to a newer version.'''
-
- # running commands directly:
- # pydoc3 alembic.config.Config
- # pydoc3 alembic.commands
-
- # inspecting the scripts directly:
- # alembic/script/base.py:61
-
- from alembic import command
- from alembic.config import Config
-
- config = Config()
- config.set_main_option("script_location", "medashare:alembic")
-
- with self._engine.begin() as connection:
- config.attributes['engine'] = self._engine
- command.upgrade(config, version)
-
- def get_host(self, hostuuid):
- hostuuid = _makeuuid(hostuuid)
-
- with self._ses() as session:
- a = session.get(orm.HostTable, hostuuid)
-
- if a is None:
- raise KeyError(hostuuid)
-
- return self._by_id(a.objid, session)
-
- def get_by_type(self, _type):
- try:
- if issubclass(_type, MDBase):
- _type = _type._type
- except TypeError:
- pass
-
- with self._ses() as session:
- for i in session.query(orm.MetaDataObject.data).where(
- orm.MetaDataObject.type == _type):
- yield i.data
-
- def get_hosts(self):
- return self.get_by_type(Host)
-
- @staticmethod
- def makehash(hashstr, strict=True):
- '''Take a hash or hash string, and return a valid hash
- string from it.
-
- This makes sure that it is of the correct type and length.
-
- If strict is False, the function will detect the length and
- return a valid hash string if one can be found.
-
- By default, the string must be prepended by the type,
- followed by a colon, followed by the value in hex in all
- lower case characters.'''
-
- try:
- hash, value = hashstr.split(':')
- except ValueError:
- if strict:
- raise
-
- hash = _hashlengths[len(hashstr)]
- value = hashstr
-
- bvalue = value.encode('ascii')
- if strict and len(bvalue.translate(None,
- string.hexdigits.lower().encode('ascii'))) != 0:
- raise ValueError('value has invalid hex digits (must be lower case)', value)
-
- if hash in _validhashes:
- return ':'.join((hash, value))
-
- raise ValueError
-
- def __len__(self):
- with self._ses() as session:
- return list(session.query(func.count(
- orm.MetaDataObject.uuid)))[0][0]
-
- def __iter__(self):
- with self._ses() as session:
- for i in session.query(orm.MetaDataObject.data).all():
- yield i.data
-
- @classmethod
- def load(cls, fname):
- engine = create_engine("sqlite+pysqlite:///%s" % fname,
- echo=_sql_verbose, future=True)
-
- return cls(engine)
-
- def store(self, fname):
- '''Write out the objects in the store to the file named
- fname.'''
-
- pass
-
- def _add_uuidv5(self, id, obj, session):
- session.execute(delete(orm.UUIDv5Table).where(
- orm.UUIDv5Table.uuid == id))
-
- o = orm.UUIDv5Table(uuid=id, objid=obj.uuid)
- session.add(o)
-
- def _lock(self, session):
- '''Function to issue a write to "lock" the database transaction.'''
-
- res = list(session.scalars(select(orm.Dummy).where(
- orm.Dummy.id == 1)))
-
- if res:
- session.delete(res[0])
- else:
- d = orm.Dummy(id=1)
- session.add(d)
-
- def loadobj(self, obj):
- '''Load obj into the data store.'''
-
- obj = MDBase.create_obj(obj)
-
- with self._ses() as session:
- self._lock(session)
-
- oldobj = session.get(orm.MetaDataObject, obj.uuid)
- #if oldobj.modified > obj.modified:
- # return
- if oldobj is not None:
- # XXX - missing cleanup of indexes
- session.delete(oldobj)
-
- sobj = orm.MetaDataObject(uuid=obj.uuid, type=obj.type,
- modified=obj.modified, data=obj)
- session.add(sobj)
-
- if obj.type == 'file':
- objid = _makeuuid(obj.id)
- oldobj = self._by_id(objid, session)
- if oldobj is not None:
- # pick which obj
- if oldobj.modified > obj.modified:
- session.delete(session.get(
- orm.MetaDataObject,
- obj.uuid))
- obj = oldobj
- else:
- # get ride of old obj
- session.delete(session.get(
- orm.MetaDataObject,
- oldobj.uuid))
-
- self._add_uuidv5(obj.id, obj, session)
- elif obj.type == 'container':
- self._add_uuidv5(obj.make_id(obj.uri), obj,
- session)
- elif obj.type == 'host':
- o = orm.HostTable(hostid=_makeuuid(
- obj.hostuuid), objid=obj.uuid)
- session.add(o)
- elif obj.type == 'mapping':
- hostid = _makeuuid(hostuuid())
-
- maps = [ (lambda a, b: orm.HostMapping(
- hostid=uuid.UUID(a), objid=obj.uuid))(
- *x.split(':', 1)) for x in obj.mapping ]
- session.add_all(maps)
- try:
- hashes = obj.hashes
- except AttributeError:
- pass
- else:
- for j in hashes:
- h = self.makehash(j)
- r = session.get(orm.HashTable,
- dict(hash=h, uuid=obj.uuid))
- if r is None:
- session.add(orm.HashTable(
- hash=h, uuid=obj.uuid))
-
- session.commit()
-
- def drop_uuid(self, uuid):
- uuid = _makeuuid(uuid)
-
- with self._ses() as session:
- obj = session.get(orm.MetaDataObject, uuid)
- session.delete(obj)
-
- obj = obj.data
-
- if obj.type == 'file':
- session.execute(delete(orm.UUIDv5Table).where(
- orm.UUIDv5Table.uuid == obj.id))
-
- for j in obj.hashes:
- h = self.makehash(j)
- session.execute(delete(orm.HashTable).where(
- orm.HashTable.hash == h and
- orm.HashTable.uuid == obj.uuid))
-
- session.commit()
-
- def by_id(self, id):
- '''Look up an object by it's UUID.'''
-
- id = _makeuuid(id)
-
- with self._ses() as session:
- res = self._by_id(id, session)
- if res is None:
- raise KeyError(id)
-
- return res
-
- def _by_id(self, id, session):
- if id.version == 5:
- res = session.get(orm.UUIDv5Table, id)
- if res is None:
- return
-
- id = res.objid
-
- res = session.get(orm.MetaDataObject, id)
- if res is None:
- return
-
- return res.data
-
- def by_hash(self, hash, types=None):
- '''Look up an object by it's hash value.
-
- types is either a list of types, or None meaning all.'''
-
- h = self.makehash(hash, strict=False)
-
- types = True if types is None else \
- orm.MetaDataObject.type.in_(types)
-
- sel = select(orm.MetaDataObject.data).where(
- orm.MetaDataObject.uuid == orm.HashTable.uuid,
- orm.HashTable.hash == h, types)
-
- with self._ses() as session:
- r = list(session.scalars(sel))
-
- return r
-
- def get_metadata(self, fname, persona, create_metadata=True):
- '''Get all MetaData objects for fname, or create one if
- not found.
-
- If a FileObject is not present, one will be created.
-
- A Persona must be passed in to create the FileObject and
- MetaData objects as needed.
-
- A MetaData object will be created if create_metadata is
- True, which is the default.
-
- Note: if a new MetaData object is created, it is not
- stored in the database automatically. It is expected that
- it will be modified and then saved, so call ObjectStore.loadobj
- with it to save it.
- '''
-
- try:
- fobj = self.by_file(fname, ('file',))[0]
- except KeyError:
- fobj = persona.by_file(fname)
-
- self.loadobj(fobj)
-
- # we now have the fobj, get the metadata for it.
- try:
- objs = self.by_file(fname)
- except KeyError:
- if create_metadata:
- objs = [ persona.MetaData(hashes=fobj.hashes) ]
- else:
- objs = [ ]
-
- return objs
-
- def _get_hostmappings(self):
- '''Returns the tuple (lclpath, hostid, rempath) for all
- the mappings for this hostid.'''
-
- hostid = _makeuuid(hostuuid())
-
- sel = select(orm.MetaDataObject.data).where(
- orm.HostMapping.hostid == hostid,
- orm.HostMapping.objid == orm.MetaDataObject.uuid)
-
- res = []
- with self._ses() as session:
- # XXX - view
- for obj in session.scalars(sel):
- maps = [ (lambda a, b: (uuid.UUID(a),
- pathlib.Path(b).resolve()))(*x.split(':',
- 1)) for x in obj.mapping ]
- for idx, (id, path) in enumerate(maps):
- if hostid == id:
- # add other to mapping
- other = tuple(maps[(idx + 1) %
- 2])
- res.append((path, ) + other)
- return res
-
- def by_file(self, fname, types=('metadata', )):
- '''Return a metadata object for the file named fname.
-
- Will check the mapping database to get hashes, and possibly
- return that FileObject if requested.
-
- Will raise a KeyError if this file does not exist in
- the database.
-
- Will raise a ValueError if fname currently does not
- match what is in the database.
- '''
-
- fid = FileObject.make_id(fname)
-
- #print('bf:', repr(fid), file=_real_stderr)
- try:
- fobj = self.by_id(fid)
- lclfile = None
- except KeyError:
- # check mappings
- fname = pathlib.Path(fname).resolve()
- for lclpath, hostid, rempath in self._get_hostmappings():
- if fname.parts[:len(lclpath.parts)] == lclpath.parts:
- try:
- rempath = pathlib.Path(
- *rempath.parts +
- fname.parts[len(
- lclpath.parts):])
- fid = FileObject.make_id(
- rempath, hostid)
- fobj = self.by_id(fid)
- lclfile = fname
- break
- except KeyError:
- continue
- else:
- raise
-
- fobj.verify(lclfile)
-
- for i in fobj.hashes:
- j = self.by_hash(i)
-
- # Filter out non-metadata objects
- j = [ x for x in j if x.type in types ]
- if j:
- return j
- else:
- raise KeyError('unable to find metadata for file: %s' %
- repr(fname))
-
- def _readfp(fp):
- while True:
- r = fp.read(64*1024)
- # libarchive returns None on EOF
- if r == b'' or r is None:
- return
-
- yield r
-
- def _hashfile(fname):
- with open(fname, 'rb') as fp:
- return _hashfp(fp)
-
- def _hashfp(fp):
- hash = getattr(hashlib, _defaulthash)()
- for r in _readfp(fp):
- hash.update(r)
-
- return '%s:%s' % (_defaulthash, hash.hexdigest())
-
- class Host(MDBase):
- _type = 'host'
-
- _class_instance_properties = {
- 'hostuuid': _makeuuid,
- }
-
- class Mapping(MDBase):
- _type = 'mapping'
-
- class FileObject(MDBase):
- _type = 'file'
-
- _class_instance_properties = {
- 'hostid': _makeuuid,
- 'id': _makeuuid,
- 'mtime': _makedatetime,
- }
-
- @property
- def fullname(self):
- return os.path.join(self.dir, self.filename)
-
- @staticmethod
- def make_id(fname, hostid=None):
- '''Take a local file name, and make the id for it. Note that
- converts from the local path separator to a forward slash so
- that it will be the same between Windows and Unix systems.'''
-
- if hostid is None:
- hostid = hostuuid()
-
- fname = os.path.realpath(fname)
- return uuid.uuid5(_NAMESPACE_MEDASHARE_PATH,
- str(hostid) + '/'.join(os.path.split(fname)))
-
- _statsymbtoname = { getattr(stat, x): 'stat.' + x for x in dir(stat) if x.startswith('S_') }
-
- @classmethod
- def _modetosymbolic(cls, mode): # pragma: no cover
- r = []
- while mode:
- nbit = -mode & mode
-
- r.append(cls._statsymbtoname[nbit])
-
- mode = mode & ~nbit
-
- return '|'.join(r)
-
- @classmethod
- def _real_stat_repr(cls, st): # pragma: no cover
- return 'os.stat_result' \
- '((%s, %d, %d, %d, %d, %d, %d, %d, %.6f, %d))' % \
- (cls._modetosymbolic(st.st_mode), 10, 100, 1, 100, 100,
- st.st_size, st.st_atime, st.st_mtime, st.st_ctime)
-
- @classmethod
- def from_file(cls, filename, created_by_ref):
- filename = os.path.abspath(filename)
- s = os.stat(filename)
- # keep so that when new files are added, it's easy to get stat
- #_debprint(repr(filename), cls._real_stat_repr(s))
- # XXX - race here, fix w/ checking mtime before/after?
- obj = {
- 'created_by_ref': created_by_ref,
- 'hostid': hostuuid(),
- 'dir': os.path.dirname(filename),
- 'filename': os.path.basename(filename),
- 'id': cls.make_id(filename),
- 'mtime': datetime.datetime.fromtimestamp(s.st_mtime,
- tz=datetime.timezone.utc),
- 'size': s.st_size,
- 'hashes': [ _hashfile(filename), ],
- }
-
- return cls(obj)
-
- def verify(self, lclfile=None):
- '''Verify that this FileObject is still valid. It will
- by default, only do a mtime verification.
-
- It will raise a ValueError if the file does not match.'''
-
- if lclfile is None:
- s = os.stat(os.path.join(self.dir, self.filename))
- else:
- s = os.stat(lclfile)
-
- mtimets = datetime.datetime.fromtimestamp(s.st_mtime,
- tz=datetime.timezone.utc).timestamp()
-
- #print(repr(self), repr(s), s.st_mtime, file=_real_stderr)
- if self.mtime.timestamp() != mtimets or \
- self.size != s.st_size:
- raise ValueError('file %s has changed' %
- repr(self.filename))
-
- class Container(MDBase):
- _type = 'container'
-
- _common_optional = MDBase._common_optional | set([ 'uri' ])
-
- @staticmethod
- def make_id(uri):
- return uuid.uuid5(_NAMESPACE_MEDASHARE_CONTAINER, uri)
-
- def enumeratedir(_dir, created_by_ref):
- '''Enumerate all the files and directories (not recursive) in _dir.
-
- Returned is a list of FileObjects.'''
-
- return [FileObject.from_file(os.path.join(_dir, x),
- created_by_ref) for x in sorted(os.listdir(_dir)) if not
- os.path.isdir(os.path.join(_dir, x)) ]
-
- def _get_paths(options):
- fnames = (
- '.medashare_identity.pasn1',
- '.medashare_store.sqlite3',
- '.medashare_cache.pasn1' )
-
- if 'MEDASHARE_PATH' in os.environ:
- return ( os.path.expanduser(
- os.path.join(os.environ['MEDASHARE_PATH'], x)) for x in
- fnames )
-
- return ( os.path.expanduser('~/' + x) for x in fnames )
-
- def init_datastructs(f):
- @functools.wraps(f)
- def wrapper(options):
-
- identfname, storefname, cachefname = _get_paths(options)
-
- # create the persona
- try:
- persona = Persona.load(identfname)
- except FileNotFoundError:
- print('ERROR: Identity not created, create w/ genident.',
- file=sys.stderr)
- sys.exit(1)
-
- # create the object store
- engine = create_engine("sqlite+pysqlite:///%s" % storefname,
- echo=_sql_verbose, future=True)
-
- objstr = ObjectStore(engine)
-
- # create the cache
- cache = TagCache.load(cachefname)
-
- try:
- return f(options, persona, objstr, cache)
- finally:
- if cache.modified:
- cache.store(cachefname)
- return wrapper
-
- def cmd_genident(options):
- identfname, _, _ = _get_paths(options)
-
- if os.path.exists(identfname):
- print('Error: Identity already created.', file=sys.stderr)
- sys.exit(1)
-
- persona = Persona()
- persona.generate_key()
-
- persona.new_version(*(x.split('=', 1) for x in options.tagvalue))
-
- persona.store(identfname)
-
- def cmd_ident(options):
- identfname, _, _ = _get_paths(options)
-
- persona = Persona.load(identfname)
-
- if options.tagvalue:
- persona.new_version(*(x.split('=', 1) for x in
- options.tagvalue))
-
- persona.store(identfname)
- else:
- ident = persona.get_identity()
- for k, v in _iterdictlist(ident, skipcommon=False):
- print('%s:\t%s' % (k, v))
-
- def cmd_pubkey(options):
- identfname, _, _ = _get_paths(options)
-
- persona = Persona.load(identfname)
-
- print(persona.get_pubkey())
-
- @init_datastructs
- def cmd_modify(options, persona, objstr, cache):
- # because of how argparse works, only one file will be collected
- # multiple files will end up in modtagvalues, so we need to
- # find and move them.
-
- for idx, i in enumerate(options.modtagvalues):
- if i[0] not in { '+', '-' }:
- # move remaining files
- options.files[0:0] = options.modtagvalues[idx:]
- del options.modtagvalues[idx:]
- break
-
- props = [[ x[0] ] + x[1:].split('=', 1) for x in options.modtagvalues]
- if any(x[0] not in ('+', '-') for x in props):
- print('ERROR: tag needs to start with a "+" (add) or a "-" (remove).', file=sys.stderr)
- sys.exit(1)
-
- badtags = list(x[1] for x in props if x[1] in (MDBase._common_names |
- MDBase._common_optional))
- if any(badtags):
- print('ERROR: invalid tag%s: %s.' % ( 's' if
- len(badtags) > 1 else '', repr(badtags)), file=sys.stderr)
- sys.exit(1)
-
- adds = [ x[1:] for x in props if x[0] == '+' ]
-
- if any((len(x) != 2 for x in adds)):
- print('ERROR: invalid tag, needs an "=".', file=sys.stderr)
- sys.exit(1)
-
- dels = [ x[1:] for x in props if x[0] == '-' ]
-
- for i in options.files:
- #print('a:', repr(i), file=_real_stderr)
-
- try:
- objs = objstr.get_metadata(i, persona)
- #print('d:', repr(i), repr(objs), file=_real_stderr)
- except FileNotFoundError:
- print('ERROR: file not found: %s, or invalid tag specification.' % repr(i), file=sys.stderr)
- sys.exit(1)
-
-
- for j in objs:
- #print('c:', repr(j), file=_real_stderr)
- # make into key/values
- # copy as we modify it later, which is bad
- obj = j.__to_dict__().copy()
-
- # delete tags
- for k in dels:
- try:
- key, v = k
- except ValueError:
- del obj[k[0]]
- else:
- obj[key].remove(v)
-
- # add tags
- uniqify = set()
- for k, v in adds:
- obj.setdefault(k, []).append(v)
- if k in j._uniq_properties:
- uniqify.add(k)
-
- for k in uniqify:
- obj[k] = list(set(obj[k]))
-
- #print('a:', repr(obj), file=_real_stderr)
- del obj['modified']
- nobj = MDBase.create_obj(obj)
-
- objstr.loadobj(nobj)
-
- def printhost(host):
- print('%s\t%s' % (host.name, host.hostuuid))
-
- @init_datastructs
- def cmd_mapping(options, persona, objstr, cache):
- if options.mapping is not None:
- parts = [ x.split(':', 1) for x in options.mapping ]
-
- if len(parts[0]) == 1:
- parts[0] = [ hostuuid(), parts[0][0] ]
-
- if parts[0][0] == hostuuid():
- parts[0][1] = str(pathlib.Path(parts[0][1]).resolve())
-
- if parts[1][1][0] != '/':
- print('ERROR: host path must be absolute, is %s.' %
- repr(parts[1][1][0]), file=sys.stderr)
- sys.exit(1)
-
- try:
- [ objstr.get_host(x[0]) for x in parts ]
- except KeyError as e:
- print('ERROR: Unable to find host %s' %
- str(e.args[0]), file=sys.stderr)
- sys.exit(1)
-
- m = persona.Mapping(mapping=[ ':'.join(x) for x in parts ])
-
- objstr.loadobj(m)
-
- @init_datastructs
- def cmd_hosts(options, persona, objstr, cache):
- selfuuid = hostuuid()
-
- try:
- host = objstr.get_host(selfuuid)
- except KeyError:
- host = persona.Host(name=socket.gethostname(), hostuuid=selfuuid)
- objstr.loadobj(host)
-
- printhost(host)
-
- hosts = objstr.get_hosts()
-
- for i in hosts:
- if i == host:
- continue
-
- printhost(i)
-
- def genstartstop(cnt, idx):
- idx = min(idx, cnt - 10)
- idx = max(0, idx)
-
- maxstart = max(0, cnt - 20)
-
- startidx = min(max(0, idx - 10), maxstart)
- endidx = min(cnt, startidx + 20)
-
- return startidx, endidx
-
- def getnextfile(files, idx):
- # original data incase of abort
- origfiles = files
- origidx = idx
-
- # current selection (last file or dir)
- curselidx = origidx
-
- currentcnt = None
-
- while True:
- if len(files) != currentcnt:
- currentcnt = len(files)
- maxidx = max(0, currentcnt - 10)
- idx = min(maxidx, idx)
-
- startidx, endidx = genstartstop(currentcnt, idx)
- subset = files[startidx:endidx]
- selfile = -1 if curselidx < startidx or curselidx >= startidx + \
- 20 else curselidx - startidx
- print('%2d) Parent directory' % 0)
- for i, f in enumerate(subset):
- print('%2d)%1s%s%s' % (i + 1, '*' if i == selfile else '', repr(str(f)), '/' if f.is_dir() else ''))
-
- print('P) Previous page')
- print('N) Next page')
- print('A) Abort')
- print('Selection:')
- inp = sys.stdin.readline().strip()
-
- if inp.lower() == 'p':
- idx = max(0, idx - 19)
- continue
- if inp.lower() == 'n':
- idx = min(currentcnt - 1, idx + 19)
- continue
- if inp.lower() == 'a':
- return origfiles, origidx
-
- try:
- inp = int(inp)
- except ValueError:
- print('Invalid selection.')
- continue
-
- if inp == 0:
- curdir = files[idx].parent
- files = sorted(files[idx].parent.parent.iterdir())
- idx = curselidx = files.index(curdir)
- continue
- if inp < 1 or inp > len(subset):
- print('Invalid selection.')
- continue
-
- newidx = startidx - 1 + inp
-
- if files[newidx].is_dir():
- files = sorted(files[newidx].iterdir())
- curselidx = idx = 0
- continue
-
- return files, newidx
-
- def checkforfile(objstr, curfile, ask=False):
- try:
- fobj = objstr.by_file(curfile, ('file',))
- except (ValueError, KeyError):
- if not ask:
- return
-
- while True:
- print('file unknown, hash(y/n)?')
- inp = sys.stdin.readline().strip().lower()
- if inp == 'n':
- return
- if inp == 'y':
- break
-
- try:
- fobj = persona.by_file(curfile)
- except (FileNotFoundError, KeyError) as e:
- print('ERROR: file not found: %s' % repr(curfile), file=sys.stderr)
- return
- else:
- objstr.loadobj(fobj)
-
- return fobj
-
- @init_datastructs
- def cmd_interactive(options, persona, objstr, cache):
- files = [ pathlib.Path(x) for x in options.files ]
-
- cache.count = 15
-
- autoskip = True
-
- idx = 0
- if not files:
- files = sorted(pathlib.Path('.').iterdir())
-
- while True:
- curfile = files[idx]
-
- fobj = checkforfile(objstr, curfile, not autoskip)
-
- if fobj is None and autoskip and idx > 0 and idx < len(files) - 1:
- # if we are auto skipping, and within range, continue
- if inp == '1':
- idx = max(0, idx - 1)
- continue
- if inp == '2':
- idx = min(len(files) - 1, idx + 1)
- continue
-
- print('Current: %s' % repr(str(curfile)))
-
- if fobj is None:
- print('No file object for this file.')
- else:
- try:
- objs = objstr.by_file(curfile)
- except KeyError:
- print('No tags or metadata object for this file.')
- else:
- for k, v in _iterdictlist(objs[0]):
- if k in { 'sig', 'hashes' }:
- continue
- print('%s:\t%s' % (k, v))
-
- if idx == 0:
- print('1) No previous file')
- else:
- print('1) Previous: %s' % repr(str(files[idx - 1])))
-
- if idx + 1 == len(files):
- print('2) No next file')
- else:
- print('2) Next: %s' % repr(str(files[idx + 1])))
-
- print('3) List files')
- print('4) Browse directory of file.')
- print('5) Browse original list of files.')
- print('6) Add new tag.')
- print('7) Open file.')
- print('8) Turn auto skip %s' % 'off' if autoskip else 'on')
-
- tags = cache.tags()
-
- for pos, (tag, value) in enumerate(tags):
- print('%s) %s=%s' % (string.ascii_lowercase[pos], tag, value))
-
- print('Q) Quit')
-
- print('Select option: ')
-
- inp = sys.stdin.readline().strip()
-
- if inp == '1':
- idx = max(0, idx - 1)
- continue
- if inp == '2':
- idx = min(len(files) - 1, idx + 1)
- continue
- if inp == '3':
- files, idx = getnextfile(files, idx)
- continue
- if inp == '4':
- files = sorted(curfile.parent.iterdir())
- try:
- idx = files.index(curfile)
- except ValueError:
- print('WARNING: File no longer present.')
- idx = 0
- continue
- if inp == '5':
- files = [ pathlib.Path(x) for x in options.files ]
- try:
- idx = files.index(curfile)
- except ValueError:
- print('WARNING: File not present.')
- idx = 0
- continue
- if inp == '6':
- print('Tag?')
- try:
- tag, value = sys.stdin.readline().strip().split('=', 1)
- except ValueError:
- print('Invalid tag, no "=".')
- else:
- cache.add((tag, value))
- metadata = objstr.get_metadata(curfile, persona)[0]
-
- metadata = metadata.new_version((tag, value))
-
- objstr.loadobj(metadata)
-
- continue
- if inp == '7':
- subprocess.run(('open', curfile))
- continue
-
- if inp.lower() == 'q':
- break
-
- try:
- i = string.ascii_lowercase.index(inp.lower())
- cache.add(tags[i])
- except (ValueError, IndexError):
- pass
- else:
- metadata = objstr.get_metadata(curfile, persona)[0]
-
- metadata = metadata.new_version(tags[i])
-
- objstr.loadobj(metadata)
-
- continue
-
- print('Invalid selection.')
-
- @init_datastructs
- def cmd_dump(options, persona, objstr, cache):
- print(persona.get_identity().encode('json'))
-
- for i in objstr:
- print(i.encode('json'))
-
- def cmd_auto(options):
- for i in options.files:
- mf = detect_from_filename(i)
-
- primary = mf[0].split('/', 1)[0]
- mt = mf[0]
- if primary == 'text':
- mt += '; charset=%s' % mf[1]
-
- print('Set:')
- print('\tmimetype:\t%s' % mt)
- print()
- print('Apply (y/N)?')
-
- inp = sys.stdin.readline()
-
- if inp.strip().lower() in ('y', 'yes'):
- options.modtagvalues = [ '+mimetype=%s' % mt ]
- cmd_modify(options)
-
- @init_datastructs
- def cmd_list(options, persona, objstr, cache):
- for i in options.files:
- try:
- objs = objstr.by_file(i)
- except (ValueError, KeyError):
- # create the file, it may have the same hash
- # as something else
- try:
- fobj = persona.by_file(i)
- objstr.loadobj(fobj)
-
- objs = objstr.by_file(i)
- except (FileNotFoundError, KeyError) as e:
- print('ERROR: file not found: %s' % repr(i), file=sys.stderr)
- sys.exit(1)
-
- for j in objstr.by_file(i):
- for k, v in _iterdictlist(j):
- print('%s:\t%s' % (k, v))
-
- # This is needed so that if it creates a FileObj, which may be
- # expensive (hashing large file), that it gets saved.
-
- def handle_bittorrent(fname, persona, objstr):
- with open(fname, 'rb') as fp:
- torrent = bencode.bdecode(fp.read())
- bencodedinfo = bencode.bencode(torrent['info'])
- infohash = hashlib.sha1(bencodedinfo).hexdigest()
-
- # XXX - not entirely happy w/ URI
- uri = 'magnet:?xt=urn:btih:%s&dn=%s' % (infohash,
- torrent['info']['name'].decode('utf-8'))
-
- try:
- cont = objstr.by_id(Container.make_id(uri))
- except KeyError:
- pass
- else:
- if 'incomplete' not in cont:
- print('Warning, container already complete, skipping %s.' % repr(fname), file=sys.stderr)
- return
-
- good, bad = validate_file(fname)
-
- if bad:
- print('Warning, incomple/invalid files, not added for %s:' %
- repr(fname), file=sys.stderr)
- print('\n'.join('\t%s' %
- repr(str(pathlib.Path(*x.parts[1:]))) for x in
- sorted(bad)), file=sys.stderr)
-
- files = []
- hashes = []
- for j in sorted(good):
- files.append(str(pathlib.PosixPath(*j.parts[1:])))
- try:
- fobj = objstr.by_file(j, ('file',))[0]
- except:
- fobj = persona.by_file(j)
- objstr.loadobj(fobj)
-
- # XXX - ensure only one is added?
- hashes.extend(fobj.hashes)
-
- kwargs = dict(files=files, hashes=hashes,
- uri=uri)
-
- if bad:
- kwargs['incomplete'] = True
-
- # XXX - doesn't combine files/hashes, that is if a
- # Container has one set of good files, and then the
- # next scan has a different set, only the second set
- # will be present, not any from the first set.
-
- try:
- cont = objstr.by_id(Container.make_id(uri))
- cont = cont.new_version(dels=() if bad
- else ('incomplete',), replaces=kwargs.items())
- except KeyError:
- cont = persona.Container(**kwargs)
-
- objstr.loadobj(cont)
-
- def handle_archive(fname, persona, objstr):
- with libarchive.Archive(fname) as arch:
- files = []
- hashes = []
-
- for i in arch:
- if not i.isfile():
- continue
-
- files.append(i.pathname)
-
- with arch.readstream(i.size) as fp:
- hashes.append(_hashfp(fp))
-
- try:
- fobj = objstr.by_file(fname, ('file',))[0]
- except:
- fobj = persona.by_file(fname)
- objstr.loadobj(fobj)
-
- uri = _makehashuri(fobj.hashes[0])
-
- kwargs = dict(files=files, hashes=hashes,
- uri=uri)
- try:
- cont = objstr.by_id(Container.make_id(uri))
- # XXX - only update when different, check uri
- cont = cont.new_version(replaces=kwargs.items())
- except KeyError:
- cont = persona.Container(**kwargs)
-
- objstr.loadobj(cont)
-
- _container_mapping = {
- 'application/x-bittorrent': handle_bittorrent,
- 'application/x-tar': handle_archive,
- }
-
- @init_datastructs
- def cmd_container(options, persona, objstr, cache):
- for i in options.files:
- mf = detect_from_filename(i)
- #_debprint('mf:', repr(mf))
- fun = _container_mapping[mf.mime_type]
-
- fun(i, persona, objstr)
-
- def _json_objstream(fp):
- inp = fp.read()
-
- jd = json.JSONDecoder()
-
- while inp:
- inp = inp.strip()
- jobj, endpos = jd.raw_decode(inp)
-
- yield jobj
-
- inp = inp[endpos:]
-
- @init_datastructs
- def cmd_import(options, persona, objstr, cache):
- for jobj in _json_objstream(sys.stdin):
- if options.sign:
- cbr = _makeuuid(jobj['created_by_ref'])
- if cbr != persona.uuid:
- # new owner
- jobj['created_by_ref'] = persona.uuid
-
- # drop old parts
- jobj.pop('uuid', None)
- jobj.pop('modified', None)
-
- obj = MDBase.create_obj(jobj)
-
- if options.sign:
- obj = persona.sign(obj)
-
- objstr.loadobj(obj)
-
- @init_datastructs
- def cmd_drop(options, persona, objstr, cache):
- for i in options.uuids:
- objstr.drop_uuid(i)
-
- @init_datastructs
- def cmd_search(options, persona, objstr, cache):
- args = options.args.copy()
-
- _type = args.pop(0)
-
- searches = [ (x[0], ) + tuple(x[1:].split('=', 1)) for x in args ]
- #print(repr(searches), file=_real_stderr)
-
- def testfun(x, s=searches):
- try:
- x = objstr.by_hash(x['hashes'][0], ('metadata',))[0]
- except IndexError:
- # no metadata object
-
- # if we need anything, it's not present
- if any(x[0] == '+' for x in s):
- return False
-
- return True
-
- try:
- for i in s:
- try:
- op, key, value = i
- except ValueError:
- op, key = i
- value = None
-
- if op == '+':
- if value is None:
- if key not in x:
- return False
- elif value not in x[key]:
- return False
- elif op == '-':
- if value is None:
- if key in x:
- return False
- elif value in x[key]:
- return False
- else:
- raise ValueError('unhandled op: %s' % repr(op))
- else:
- return True
-
- except KeyError:
- return False
-
- r = ( x for x in objstr if x.type == 'file' and testfun(x) )
-
- if _type == 'file':
- r = ( x.fullname for x in r )
- else:
- raise ValueError('unhandled type: %s' % repr(_type))
-
- for i in r:
- print(i)
-
- def main():
- import argparse
-
- parser = argparse.ArgumentParser()
-
- parser.add_argument('--db', '-d', type=str,
- help='base name for storage')
-
- subparsers = parser.add_subparsers(title='subcommands',
- description='valid subcommands', help='additional help')
-
- parser_help = subparsers.add_parser('help', help='get help')
- parser_help.set_defaults(func=lambda *args: (parser.print_help(), sys.exit(0)))
-
- parser_gi = subparsers.add_parser('genident', help='generate identity')
- parser_gi.add_argument('tagvalue', nargs='+',
- help='add the arg as metadata for the identity, tag=[value]')
- parser_gi.set_defaults(func=cmd_genident)
-
- parser_i = subparsers.add_parser('ident', help='update identity')
- parser_i.add_argument('tagvalue', nargs='*',
- help='add the arg as metadata for the identity, tag=[value]')
- parser_i.set_defaults(func=cmd_ident)
-
- parser_pubkey = subparsers.add_parser('pubkey',
- help='print public key of identity')
- parser_pubkey.set_defaults(func=cmd_pubkey)
-
- # used so that - isn't treated as an option
- parser_mod = subparsers.add_parser('modify',
- help='modify tags on file(s)', prefix_chars='@')
- parser_mod.add_argument('modtagvalues', nargs='+',
- help='add (+) or delete (-) the tag=[value], for the specified files')
- parser_mod.add_argument('files', nargs='+',
- help='files to modify')
- parser_mod.set_defaults(func=cmd_modify)
-
- parser_auto = subparsers.add_parser('auto',
- help='automatic detection of file properties')
- parser_auto.add_argument('files', nargs='+',
- help='files to modify')
- parser_auto.set_defaults(func=cmd_auto)
-
- parser_list = subparsers.add_parser('list', help='list tags on file(s)')
- parser_list.add_argument('files', nargs='+',
- help='files to modify')
- parser_list.set_defaults(func=cmd_list)
-
- parser_container = subparsers.add_parser('container',
- help='file is examined as a container and the internal files imported as entries')
- parser_container.add_argument('files', nargs='+',
- help='files to modify')
- parser_container.set_defaults(func=cmd_container)
-
- parser_hosts = subparsers.add_parser('hosts',
- help='dump all the hosts, self is always first')
- parser_hosts.set_defaults(func=cmd_hosts)
-
- parser_mapping = subparsers.add_parser('mapping',
- help='list mappings, or create a mapping')
- parser_mapping.add_argument('--create', dest='mapping', nargs=2,
- help='mapping to add, host|hostuuid:path host|hostuuid:path')
- parser_mapping.set_defaults(func=cmd_mapping)
-
- parser_interactive = subparsers.add_parser('interactive',
- help='start in interactive mode')
- parser_interactive.add_argument('files', nargs='*',
- help='files to work with')
- parser_interactive.set_defaults(func=cmd_interactive)
-
- parser_dump = subparsers.add_parser('dump', help='dump all the objects')
- parser_dump.set_defaults(func=cmd_dump)
-
- parser_import = subparsers.add_parser('import',
- help='import objects encoded as json')
- parser_import.add_argument('--sign', action='store_true',
- help='import as new identity, and sign objects (if created_by_ref is different, new uuid is created)')
- parser_import.set_defaults(func=cmd_import)
-
- parser_drop = subparsers.add_parser('drop',
- help='drop the object specified by UUID')
- parser_drop.add_argument('uuids', nargs='+',
- help='UUID of object to drop')
- parser_drop.set_defaults(func=cmd_drop)
-
- parser_search = subparsers.add_parser('search',
- help='find objects', prefix_chars='@')
- parser_search.add_argument('args', nargs='+',
- help='args')
- parser_search.set_defaults(func=cmd_search)
-
- options = parser.parse_args()
-
- fun = options.func
- fun(options)
-
- if __name__ == '__main__': # pragma: no cover
- main()
-
- class _TestCononicalCoder(unittest.TestCase):
- def test_con(self):
- # make a dict
- obja = {
- 'foo': 23984732, 'a': 5, 'b': 6,
- 'something': '2398472398723498273dfasdfjlaksdfj'
- }
-
- # reorder the items in it
- objaitems = list(obja.items())
- objaitems.sort()
- objb = dict(objaitems)
-
- # and they are still the same
- self.assertEqual(obja, objb)
-
- # This is to make sure that item order changed
- self.assertNotEqual(list(obja.items()), list(objb.items()))
-
- astr = pasn1.dumps(obja)
- bstr = pasn1.dumps(objb)
-
- # that they normally will be serialized differently
- self.assertNotEqual(astr, bstr)
-
- # but w/ the special encoder
- astr = _asn1coder.dumps(obja)
- bstr = _asn1coder.dumps(objb)
-
- # they are now encoded the same
- self.assertEqual(astr, bstr)
-
- class _TestMigrations(unittest.TestCase):
- def setUp(self):
- self._engine = create_engine('sqlite+pysqlite:///:memory:',
- echo=_sql_verbose, future=True)
-
- def test_f2131(self):
- # That an object store generated at the start
- objstr = ObjectStore(self._engine, 'afad01589b76')
-
- # and a host objects
- hostobj = Host(created_by_ref=uuid.uuid4(), hostuuid=uuid.uuid4())
-
- # build table metadata from original db
- mdo = sqlalchemy.schema.MetaData()
- mdobjstable = sqlalchemy.Table('metadata_objects', mdo, autoload_with=self._engine)
-
- with objstr._ses() as session:
- stmt = insert(mdobjstable).values(
- uuid=hostobj.uuid.hex, modified=hostobj.modified,
- data=hostobj.encode())
- session.execute(stmt)
-
- session.commit()
-
- # migrate the database forward
- objstr._handle_migration('head')
-
- # make sure we can query it
- self.assertEqual(list(objstr.get_hosts()), [ hostobj ])
- self.assertEqual(list(objstr), [ hostobj ])
- self.assertEqual(list(objstr.get_by_type('file')), [ ])
- self.assertEqual(list(objstr.get_by_type(FileObject)), [ ])
- self.assertEqual(list(objstr.get_by_type(Host)), [ hostobj ])
-
- #with objstr._ses() as session:
- # for i in session.query(orm.MetaDataObject).all():
- # _debprint('c:', repr(i))
-
- class _TestCases(unittest.TestCase):
- def setUp(self):
- self.fixtures = pathlib.Path('fixtures').resolve()
-
- d = pathlib.Path(tempfile.mkdtemp()).resolve()
- self.basetempdir = d
- self.tempdir = d / 'subdir'
-
- self.persona = Persona.load(os.path.join('fixtures',
- 'sample.persona.pasn1'))
- self.created_by_ref = self.persona.get_identity().uuid
-
- shutil.copytree(self.fixtures / 'testfiles', self.tempdir)
- shutil.copy(self.fixtures / 'sample.data.sqlite3', self.tempdir)
-
- self.oldcwd = os.getcwd()
-
- def tearDown(self):
- shutil.rmtree(self.basetempdir)
- self.tempdir = None
-
- os.chdir(self.oldcwd)
-
- def test_genstartstop(self):
- self.assertEqual(genstartstop(5, 0), (0, 5))
- self.assertEqual(genstartstop(5, 1), (0, 5))
- self.assertEqual(genstartstop(5, 4), (0, 5))
- self.assertEqual(genstartstop(25, 1), (0, 20))
- self.assertEqual(genstartstop(25, 20), (5, 25))
- self.assertEqual(genstartstop(25, 24), (5, 25))
- self.assertEqual(genstartstop(124, 1), (0, 20))
- self.assertEqual(genstartstop(124, 53), (43, 63))
- self.assertEqual(genstartstop(124, 120), (104, 124))
- self.assertEqual(genstartstop(124, 124), (104, 124))
-
- def test_fileobject(self):
- os.chdir(self.tempdir)
-
- engine = create_engine(
- "sqlite+pysqlite:///memdb1?mode=memory",
- echo=_sql_verbose, future=True)
-
- objst = ObjectStore(engine)
-
- a = self.persona.by_file('test.txt')
-
- # that the dir is absolute
- self.assertEqual(a.dir[0], '/')
-
- # make sure the file's hostid is a UUID
- self.assertIsInstance(a.hostid, uuid.UUID)
-
- # make sure the file's id is a UUID
- self.assertIsInstance(a.id, uuid.UUID)
-
- objst.loadobj(a)
-
- #_debprint('a:', repr(a))
- #_debprint('by_id:', objst.by_id(a.uuid))
-
- # write out the store
- objst.store('teststore.pasn1')
-
- # load it back in
- objstr = ObjectStore(engine)
-
- a = objstr.by_id(a['uuid'])
-
- # make sure the hostid is still a UUID
- self.assertIsInstance(a.hostid, uuid.UUID)
-
- # make sure the file's id is still a UUID
- self.assertIsInstance(a.id, uuid.UUID)
-
- # That it can be encoded to json
- jsfo = a.encode('json')
-
- # that it can be decoded from json
- jsloadedfo = MDBase.decode(jsfo, 'json')
-
- # and that it is equal
- self.assertEqual(jsloadedfo, a)
-
- def test_mdbase(self):
- self.assertRaises(ValueError, MDBase, created_by_ref='')
- self.assertRaises(ValueError, MDBase.create_obj,
- { 'type': 'unknosldkfj' })
- self.assertRaises(ValueError, MDBase.create_obj,
- { 'type': 'metadata' })
-
- baseobj = {
- 'type': 'metadata',
- 'created_by_ref': self.created_by_ref,
- }
- origbase = copy.deepcopy(baseobj)
-
- # that when an MDBase object is created
- md = MDBase.create_obj(baseobj)
-
- # it doesn't modify the passed in object (when adding
- # generated properties)
- self.assertEqual(baseobj, origbase)
-
- # and it has the generted properties
- # Note: cannot mock the functions as they are already
- # referenced at creation time
- self.assertIn('uuid', md)
- self.assertIn('modified', md)
-
- # That you can create a new version using new_version
- md2 = md.new_version(('dc:creator', 'Jim Bob',))
-
- # that they are different
- self.assertNotEqual(md, md2)
-
- # and that the new modified time is different from the old
- self.assertNotEqual(md.modified, md2.modified)
-
- # and that the modification is present
- self.assertEqual(md2['dc:creator'], [ 'Jim Bob' ])
-
- # that providing a value from common property
- fvalue = b'fakesig'
- md3 = md.new_version(('sig', fvalue))
-
- # gets set directly, and is not a list
- self.assertEqual(md3.sig, fvalue)
-
- # that invalid attribute access raises correct exception
- self.assertRaises(AttributeError, getattr, md,
- 'somerandombogusattribute')
-
- # that when readding an attribute that already exists
- md3 = md2.new_version(('dc:creator', 'Jim Bob',))
-
- # that only one exists
- self.assertEqual(md3['dc:creator'], [ 'Jim Bob' ])
-
- def test_mdbase_encode_decode(self):
- # that an object
- baseobj = {
- 'type': 'metadata',
- 'created_by_ref': self.created_by_ref,
- }
- obj = MDBase.create_obj(baseobj)
-
- # can be encoded
- coded = obj.encode()
-
- # and that the rsults can be decoded
- decobj = MDBase.decode(coded)
-
- # and that they are equal
- self.assertEqual(obj, decobj)
-
- # and in the encoded object
- eobj = _asn1coder.loads(coded)
-
- # the uuid property is a str instance
- self.assertIsInstance(eobj['uuid'], bytes)
-
- # and has the length of 16
- self.assertEqual(len(eobj['uuid']), 16)
-
- # and that json can be used to encode
- js = obj.encode('json')
-
- # and that it is valid json
- jsobj = json.loads(js)
-
- # and that it can be decoded
- jsdecobj = MDBase.decode(js, 'json')
-
- # and that it matches
- self.assertEqual(jsdecobj, obj)
-
- for key, inval in [
- ('modified', '2022-08-19T01:27:34.258676'),
- ('modified', '2022-08-19T01:27:34Z'),
- ('modified', '2022-08-19T01:27:34.258676+00:00'),
- ('uuid', 'z5336176-8086-4c21-984f-fda60ddaa172'),
- ('uuid', '05336176-8086-421-984f-fda60ddaa172'),
- ]:
- jsobj['modified'] = inval
- jstest = json.dumps(jsobj)
- self.assertRaises(ValueError, MDBase.decode, jstest, 'json')
-
- def test_mdbase_wrong_type(self):
- # that created_by_ref can be passed by kw
- obj = MetaData(created_by_ref=self.created_by_ref)
-
- self.assertRaises(ValueError, FileObject, dict(obj.items(False)))
-
- def test_makehash(self):
- self.assertRaises(ValueError, ObjectStore.makehash, 'slkj')
- self.assertRaises(ValueError, ObjectStore.makehash, 'sha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ADA')
- self.assertRaises(ValueError, ObjectStore.makehash, 'bogushash:9e0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ADA', strict=False)
-
- self.assertEqual(ObjectStore.makehash('cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e', strict=False), 'sha512:cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e')
- self.assertEqual(ObjectStore.makehash('e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', strict=False), 'sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855')
-
- @staticmethod
- def statmock(fname):
- fname = pathlib.Path(fname)
- fnameparts = fname.parts
- subdiridx = fnameparts.index('subdir')
-
- _stats = {
- # repr on os.stat_result doesn't work
- # (mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime)
-
- 'f': os.stat_result((stat.S_IROTH|stat.S_IXOTH|stat.S_IRGRP|stat.S_IXGRP|stat.S_IXUSR|stat.S_IRUSR|stat.S_IFDIR, 10, 100, 2, 100, 100, 1024, 1654166365, 1558388856.000000, 1663133775)),
- 'test.txt': os.stat_result((stat.S_IROTH|stat.S_IRGRP|stat.S_IWUSR|stat.S_IRUSR|stat.S_IFREG, 10, 100, 1, 100, 100, 15, 1654166365, 1558388856.000000, 1663133775)),
- 'newfile.txt': os.stat_result((stat.S_IROTH|stat.S_IRGRP|stat.S_IWUSR|stat.S_IRUSR|stat.S_IFREG, 10, 100, 1, 100, 100, 19, 1659652579, 1658982768.041291, 1663133775)),
- 'sample.data.sqlite3': os.stat_result((stat.S_IROTH|stat.S_IRGRP|stat.S_IWUSR|stat.S_IRUSR|stat.S_IFREG, 10, 100, 1, 100, 100, 57344, 1663133777, 1663133777.529757, 1663133777)),
- 't': os.stat_result((stat.S_IFDIR, 0, 0, 0, 0, 0, 0, 0, 0, 0)),
- 'z.jpg': os.stat_result((stat.S_IROTH|stat.S_IRGRP|stat.S_IWUSR|stat.S_IRUSR|stat.S_IFREG, 10, 100, 1, 100, 100, 332, 1661553878, 1661551130.361235, 1663134325)),
-
- }
-
- subpath = '/'.join(fnameparts[subdiridx + 1:])
- return _stats[subpath]
-
- @mock.patch('os.stat')
- def test_enumeratedir(self, statmock):
- statmock.side_effect = self.statmock
-
- files = enumeratedir(self.tempdir, self.created_by_ref)
- ftest = [ x for x in files if x.filename == 'test.txt' ][0]
- fname = 'test.txt'
-
- # make sure that they are of type MDBase
- self.assertIsInstance(ftest, MDBase)
-
- oldid = ftest.id
- self.assertEqual(ftest.filename, fname)
- self.assertEqual(ftest.dir, str(self.tempdir))
- # XXX - do we add host information?
- self.assertEqual(ftest.id, uuid.uuid5(_NAMESPACE_MEDASHARE_PATH,
- str(hostuuid()) + '/'.join(os.path.split(self.tempdir) +
- ( fname, ))))
- self.assertEqual(ftest.mtime, datetime.datetime(2019, 5, 20,
- 21, 47, 36, tzinfo=datetime.timezone.utc))
- self.assertEqual(ftest.size, 15)
- self.assertIn('sha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f', ftest.hashes)
-
- # XXX - make sure works w/ relative dirs
- files = enumeratedir(os.path.relpath(self.tempdir),
- self.created_by_ref)
- self.assertEqual(files[2].filename, 'test.txt')
- self.assertEqual(oldid, files[2].id)
-
- def test_mdbaseoverlay(self):
- engine = create_engine("sqlite+pysqlite:///:memory:", echo=_sql_verbose, future=True)
- objst = ObjectStore(engine)
-
- # that a base object
- bid = uuid.uuid4()
- objst.loadobj({
- 'type': 'metadata',
- 'uuid': bid,
- 'modified': datetime.datetime(2019, 6, 10, 14, 3, 10),
- 'created_by_ref': self.created_by_ref,
- 'hashes': [ 'sha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada' ],
- 'someprop': [ 'somevalue' ],
- 'lang': 'en',
- })
-
- # can have an overlay object
- oid = uuid.uuid4()
- dhash = 'sha256:a7c96262c21db9a06fd49e307d694fd95f624569f9b35bb3ffacd880440f9787'
- objst.loadobj({
- 'type': 'metadata',
- 'uuid': oid,
- 'modified': datetime.datetime(2019, 6, 10, 18, 3, 10),
- 'created_by_ref': self.created_by_ref,
- 'hashes': [ dhash ],
- 'parent_refs': [ bid ],
- 'lang': 'en',
- })
-
- # and that when you get it's properties
- oobj = objst.by_id(oid)
- odict = dict(list(oobj.items()))
-
- # that is has the overlays property
- self.assertEqual(odict['parent_refs'], [ bid ])
-
- # that it doesn't have a common property
- self.assertNotIn('type', odict)
-
- # that when skipcommon is False
- odict = dict(oobj.items(False))
-
- # that it does have a common property
- self.assertIn('type', odict)
-
- def test_cryptography_persona(self):
- # Verify that a persona generated by cryptography still works
- persona = Persona.load(self.fixtures / 'cryptography.persona.pasn1')
-
- realpubkey = 'nFyLw6kB15DrM46ni9eEBRb6QD4rsPuco3ymj3mvz5YM8j3hY6chcjewU7FvqDpWALTSZ3E212SxCNErdYzPjgbxTnrYNyzeYTM2k58krEcKvWW6h'
- pubkey = persona.get_pubkey()
-
- self.assertEqual(realpubkey, pubkey)
-
- vpersona = Persona.from_pubkey(realpubkey)
-
- ident = persona.get_identity()
- vpersona.verify(ident)
-
- self.assertEqual(ident.uuid, uuid.UUID('52f1a92b-0c92-41e3-b647-356db89fb49c'))
-
- def test_persona(self):
- # that a newly created persona
- persona = Persona()
-
- # has an identity object
- idobj = persona.get_identity()
-
- # and that it has a uuid attribute that matches
- self.assertEqual(persona.uuid, idobj['uuid'])
-
- # that a key can be generated
- persona.generate_key()
-
- # that the pubkey property is present
- idobj = persona.get_identity()
- self.assertIsInstance(idobj['pubkey'], bytes)
-
- # that get_pubkey returns the correct thing
- pubstr = _asn1coder.dumps([ idobj.uuid, idobj['pubkey'] ])
- self.assertEqual(persona.get_pubkey(),
- base58.b58encode_check(pubstr).decode('ascii'))
-
- # and that there is a signature
- self.assertIsInstance(idobj['sig'], bytes)
-
- # and that it can verify itself
- persona.verify(idobj)
-
- # and that a new persona can be created from the pubkey
- pkpersona = Persona.from_pubkey(persona.get_pubkey())
-
- # and that it can verify the old identity
- self.assertTrue(pkpersona.verify(idobj))
-
- # that a second time, it raises an exception
- self.assertRaises(RuntimeError, persona.generate_key)
-
- # that a file object created by it
- testfname = os.path.join(self.tempdir, 'test.txt')
- testobj = persona.by_file(testfname)
-
- # has the correct created_by_ref
- self.assertEqual(testobj.created_by_ref, idobj.uuid)
-
- self.assertEqual(testobj.type, 'file')
-
- # and has a signature
- self.assertIn('sig', testobj)
-
- # that a persona created from the identity object
- vpersona = Persona(idobj)
-
- # can verify the sig
- self.assertTrue(vpersona.verify(testobj))
-
- # and that a bogus signature
- bogussig = 'somebogussig'
- bogusobj = MDBase.create_obj(testobj)
- bogusobj.sig = bogussig
-
- # fails to verify
- self.assertRaises(Exception, vpersona.verify, bogusobj)
-
- # and that a modified object
- otherobj = testobj.new_version(('customprop', 'value'))
-
- # fails to verify
- self.assertRaises(Exception, vpersona.verify, otherobj)
-
- # that a persona object can be written
- perpath = os.path.join(self.basetempdir, 'persona.pasn1')
- persona.store(perpath)
-
- # and that when loaded back
- loadpersona = Persona.load(perpath)
-
- # the new persona object can sign an object
- nvtestobj = loadpersona.sign(testobj.new_version())
-
- # and the old persona can verify it.
- self.assertTrue(vpersona.verify(nvtestobj))
-
- def test_persona_metadata(self):
- # that a persona
- persona = Persona()
- persona.generate_key()
-
- # can create a metadata object
- hashobj = ['asdlfkj']
- mdobj = persona.MetaData(hashes=hashobj)
-
- # that the object has the correct created_by_ref
- self.assertEqual(mdobj.created_by_ref, persona.uuid)
-
- # and has the provided hashes
- self.assertEqual(mdobj.hashes, hashobj)
-
- # and that it can be verified
- persona.verify(mdobj)
-
- def test_objectstore(self):
- persona = self.persona
- objst = ObjectStore.load(self.tempdir / 'sample.data.sqlite3')
-
- lst = objst.by_hash('91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada')
- self.assertEqual(len(lst), 1)
-
- objst.loadobj({
- 'type': 'metadata',
- 'uuid': uuid.UUID('c9a1d1e2-3109-4efd-8948-577dc15e44e7'),
- 'modified': datetime.datetime(2019, 5, 31, 14, 3, 10,
- tzinfo=datetime.timezone.utc),
- 'created_by_ref': self.created_by_ref,
- 'hashes': [ 'sha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada' ],
- 'lang': 'en',
- })
-
- lst = objst.by_hash('91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada')
- self.assertEqual(len(lst), 2)
-
- byid = objst.by_id('3e466e06-45de-4ecc-84ba-2d2a3d970e96')
-
- self.assertIsInstance(byid, MetaData)
- self.assertIn(byid, lst)
-
- r = byid
-
- self.assertEqual(r.uuid, uuid.UUID('3e466e06-45de-4ecc-84ba-2d2a3d970e96'))
- self.assertEqual(r['dc:creator'], [ 'John-Mark Gurney' ])
-
- # XXX do we care anymore?
- if False:
- # test storing the object store
- fname = 'testfile.sqlite3'
- objst.store(fname)
-
- with open(fname, 'rb') as fp:
- objs = _asn1coder.loads(fp.read())
-
- os.unlink(fname)
-
- self.assertEqual(len(objs), len(objst))
-
- self.assertEqual(objs['created_by_ref'], self.created_by_ref.bytes)
-
- # make sure that the read back data matches
- for i in objs['objects']:
- i['created_by_ref'] = uuid.UUID(bytes=i['created_by_ref'])
- i['uuid'] = uuid.UUID(bytes=i['uuid'])
- self.assertEqual(objst.by_id(i['uuid']), i)
-
- # that a file
- testfname = os.path.join(self.tempdir, 'test.txt')
-
- # when registered
- objst.loadobj(persona.by_file(testfname))
-
- # can be found
- self.assertEqual(objst.by_file(testfname), [ byid ])
-
- self.assertRaises(KeyError, objst.by_file, '/dev/null')
-
- # that when a metadata object
- mdouuid = 'c9a1d1e2-3109-4efd-8948-577dc15e44e7'
- origobj = objst.by_id(mdouuid)
-
- # is updated:
- obj = origobj.new_version(('foo', 'bar'))
-
- # and stored
- objst.loadobj(obj)
-
- # that it is the new one
- self.assertEqual(obj, objst.by_id(mdouuid))
-
- # and that the old one isn't present anymore in by file
- lst = objst.by_hash('91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada')
- self.assertNotIn(origobj, lst)
-
- # XXX make sure that object store contains fileobject
-
- # Tests to add:
- # Non-duplicates when same metadata is located by multiple hashes.
-
- def objcompare(self, fullobjs, partialobjs):
- fullobjs = list(fullobjs)
- #_debprint('objs:', repr(fullobjs))
- self.assertEqual(len(fullobjs), len(partialobjs))
-
- missing = []
- for i in partialobjs:
- for idx, j in enumerate(fullobjs):
- cmpobj = dict((k, v) for k, v in j.items() if k in set(i.keys()))
- if cmpobj == i:
- break
- else: # pragma: no cover
- missing.append(i)
- continue
-
- fullobjs.pop(idx)
-
- if missing: # pragma: no cover
- _debprint('remaining objs:', repr(fullobjs))
- self.fail('Unable to find objects %s in dump' % missing)
-
- def run_command_file(self, f):
- with open(f) as fp:
- cmds = json.load(fp)
-
- # setup object store
- storefname = self.tempdir / 'storefname'
- identfname = self.tempdir / 'identfname'
- cachefname = self.tempdir / 'cachefname'
-
- # setup path mapping
- def expandusermock(arg):
- if arg == '~/.medashare_store.sqlite3':
- return storefname
- elif arg == '~/.medashare_identity.pasn1':
- return identfname
- elif arg == '~/.medashare_cache.pasn1':
- return cachefname
-
- if True: #pragma: no cover
- raise NotImplementedError(arg)
-
- # setup test fname
- testfname = os.path.join(self.tempdir, 'test.txt')
- newtestfname = os.path.join(self.tempdir, 'newfile.txt')
-
- patches = []
-
- for cmd in cmds:
- try:
- if cmd['skip']: # pragma: no cover
- continue
- except KeyError:
- pass
-
- for i in cmd.get('format', []):
- if i in { 'cmd', 'files' }:
- vars = locals()
- cmd[i] = [ x.format(**vars) for x in cmd[i] ]
- else:
- cmd[i] = cmd[i].format(**locals())
-
- try:
- special = cmd['special']
- except KeyError:
- pass
- else:
- if special == 'copy newfile.txt to test.txt':
- shutil.copy(newtestfname, testfname)
- elif special == 'change newfile.txt':
- with open(newtestfname, 'w') as fp:
- fp.write('some new contents')
- elif special == 'verify store object cnt':
- objst = ObjectStore.load(storefname)
- objcnt = len(objst)
- self.assertEqual(objcnt, len(list(objst)))
- self.assertEqual(objcnt, cmd['count'])
- elif special == 'set hostid':
- hostidpatch = mock.patch(__name__ + '.hostuuid')
- hid = cmd['hostid'] if 'hostid' in cmd else uuid.uuid4()
- hostidpatch.start().return_value = hid
- patches.append(hostidpatch)
- elif special == 'iter is unique':
- objst = ObjectStore.load(storefname)
- uniqobjs = len(set((x['uuid'] for x in objst)))
- self.assertEqual(len(list(objst)), uniqobjs)
- elif special == 'setup bittorrent files':
- # copy in the torrent file
- tor = importlib.resources.files('medashare.btv')
- tor = tor / 'fixtures' / 'somedir.torrent'
- shutil.copy(tor, self.tempdir)
-
- # partly recreate files
- btfiles = bttestcase.origfiledata.copy()
-
- if not cmd['complete']:
- btfiles.update(bttestcase.badfiles)
-
- sd = self.tempdir / bttestcase.dirname
- sd.mkdir(exist_ok=True)
-
- bttestcase.make_files(sd, btfiles)
- elif special == 'setup mapping paths':
- mappatha = self.tempdir / 'mapa'
- mappatha.mkdir()
-
- mappathb = self.tempdir / 'mapb'
- mappathb.mkdir()
-
- filea = mappatha / 'text.txt'
- filea.write_text('abc123\n')
- fileb = mappathb / 'text.txt'
- shutil.copyfile(filea, fileb)
- shutil.copystat(filea, fileb)
- elif special == 'delete files':
- for i in cmd['files']:
- os.unlink(i)
- elif special == 'setup tar file':
- shutil.copy(self.fixtures /
- 'testfile.tar.gz', self.tempdir)
- else: # pragma: no cover
- raise ValueError('unhandled special: %s' % repr(special))
-
- # coverage bug, fixed in 3.10:
- # https://github.com/nedbat/coveragepy/issues/1432#event-7130600158
- if True: # pragma: no cover
- continue
-
- with self.subTest(file=f, title=cmd['title']), \
- mock.patch('os.path.expanduser',
- side_effect=expandusermock) as eu, \
- mock.patch('sys.stdin', io.StringIO()) as stdin, \
- mock.patch('sys.stdout', io.StringIO()) as stdout, \
- mock.patch('sys.stderr', io.StringIO()) as stderr, \
- mock.patch('sys.argv', [ 'progname', ] +
- cmd['cmd']) as argv:
-
- # if there is stdin
- test_stdin = cmd.get('stdin', '')
-
- # provide it
- stdin.write(test_stdin)
- stdin.seek(0)
-
- with self.assertRaises(SystemExit) as cm:
- main()
-
- # XXX - Minor hack till other tests fixed
- sys.exit(0)
-
- # with the correct output
- self.maxDiff = None
- outeq = cmd.get('stdout')
- outnre = cmd.get('stdout_nre')
- outre = cmd.get('stdout_re')
- outcheck = cmd.get('stdout_check')
- # python3 -c 'import ast, sys; print(ast.literal_eval(sys.stdin.read()))' << EOF | jq '.'
- if outnre:
- self.assertNotRegex(stdout.getvalue(), outnre)
- if outre:
- self.assertRegex(stdout.getvalue(), outre)
- if outeq:
- self.assertEqual(stdout.getvalue(), outeq)
- if outcheck:
- stdout.seek(0)
- self.objcompare(_json_objstream(stdout), outcheck)
-
- self.assertEqual(stderr.getvalue(), cmd.get('stderr', ''))
-
- self.assertEqual(cm.exception.code, cmd.get('exit', 0))
-
- patches.reverse()
- for i in patches:
- i.stop()
-
- def test_get_paths(self):
- # Test to make sure get paths works as expected.
- with mock.patch('os.path.expanduser') as eu:
- a, b, c = _get_paths(None)
-
- eu.assert_any_call('~/.medashare_identity.pasn1')
- eu.assert_any_call('~/.medashare_store.sqlite3')
- eu.assert_any_call('~/.medashare_cache.pasn1')
-
- pathpref = pathlib.Path('/somepath/somewhere')
- with mock.patch.dict(os.environ, dict(MEDASHARE_PATH=str(pathpref))):
- i, s, c = _get_paths(None)
-
- self.assertEqual(i, str(pathpref / '.medashare_identity.pasn1'))
- self.assertEqual(s, str(pathpref / '.medashare_store.sqlite3'))
- self.assertEqual(c, str(pathpref / '.medashare_cache.pasn1'))
-
- def test_help(self):
- # that subcommand help is the same as --help
-
- with mock.patch('sys.stdout', io.StringIO()) as stdout, \
- mock.patch('sys.argv', [ 'progname', '--help', ]) as argv:
- with self.assertRaises(SystemExit) as cm:
- main()
-
- # XXX - Minor hack till other tests fixed
- sys.exit(0)
-
- dashhelp = stdout.getvalue()
-
- with mock.patch('sys.stdout', io.StringIO()) as stdout, \
- mock.patch('sys.argv', [ 'progname', 'help', ]) as argv:
- with self.assertRaises(SystemExit) as cm:
- main()
-
- # XXX - Minor hack till other tests fixed
- sys.exit(0)
-
- subhelp = stdout.getvalue()
-
- self.assertEqual(dashhelp, subhelp)
-
- #@unittest.skip('temp')
- def test_cmds(self):
- cmds = sorted(self.fixtures.glob('cmd.*.json'))
-
- for i in cmds:
- # make sure each file starts with a clean slate
- self.tearDown()
- self.setUp()
-
- os.chdir(self.tempdir)
- with self.subTest(file=i):
- self.run_command_file(i)
-
- # XXX - the following test may no longer be needed
- def test_main(self):
- # Test the main runner, this is only testing things that are
- # specific to running the program, like where the store is
- # created.
-
- # setup object store
- storefname = self.tempdir / 'storefname'
- identfname = self.tempdir / 'identfname'
- cachefname = self.tempdir / 'cachefname'
-
- # setup path mapping
- def expandusermock(arg):
- if arg == '~/.medashare_store.sqlite3':
- return storefname
- elif arg == '~/.medashare_identity.pasn1':
- return identfname
- elif arg == '~/.medashare_cache.pasn1':
- return cachefname
-
- # setup test fname
- testfname = os.path.join(self.tempdir, 'test.txt')
- newtestfname = os.path.join(self.tempdir, 'newfile.txt')
-
- import itertools
-
- with mock.patch('os.path.expanduser', side_effect=expandusermock) \
- as eu, mock.patch('medashare.cli.open') as op:
- # that when opening the store and identity fails
- op.side_effect = FileNotFoundError
-
- # and there is no identity
- with mock.patch('sys.stderr', io.StringIO()) as stderr, mock.patch('sys.argv', [ 'progname', 'list', 'afile' ]) as argv:
- with self.assertRaises(SystemExit) as cm:
- main()
-
- # that it fails
- self.assertEqual(cm.exception.code, 1)
-
- # with the correct error message
- self.assertEqual(stderr.getvalue(),
- 'ERROR: Identity not created, create w/ genident.\n')
-
- with mock.patch('os.path.expanduser', side_effect=expandusermock) \
- as eu:
- # that generating a new identity
- with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', 'genident', 'name=A Test User' ]) as argv:
- main()
-
- # does not output anything
- self.assertEqual(stdout.getvalue(), '')
-
- # looks up the correct file
- eu.assert_any_call('~/.medashare_identity.pasn1')
- eu.assert_any_call('~/.medashare_store.sqlite3')
- eu.assert_any_call('~/.medashare_cache.pasn1')
-
- # and that the identity
- persona = Persona.load(identfname)
- pident = persona.get_identity()
-
- # has the correct name
- self.assertEqual(pident.name, 'A Test User')
-
- # that when generating an identity when one already exists
- with mock.patch('sys.stderr', io.StringIO()) as stderr, mock.patch('sys.argv', [ 'progname', 'genident', 'name=A Test User' ]) as argv:
- # that it exits
- with self.assertRaises(SystemExit) as cm:
- main()
-
- # with error code 1
- self.assertEqual(cm.exception.code, 1)
-
- # and outputs an error message
- self.assertEqual(stderr.getvalue(),
- 'Error: Identity already created.\n')
-
- # and looked up the correct file
- eu.assert_any_call('~/.medashare_identity.pasn1')
-
- # that when updating the identity
- with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', 'ident', 'name=Changed Name' ]) as argv:
- main()
-
- # it doesn't output anything
- self.assertEqual(stdout.getvalue(), '')
-
- # and looked up the correct file
- eu.assert_any_call('~/.medashare_identity.pasn1')
-
- npersona = Persona.load(identfname)
- nident = npersona.get_identity()
-
- # and has the new name
- self.assertEqual(nident.name, 'Changed Name')
-
- # and has the same old uuid
- self.assertEqual(nident.uuid, pident.uuid)
-
- # and that the modified date has changed
- self.assertNotEqual(pident.modified, nident.modified)
-
- # and that the old Persona can verify the new one
- self.assertTrue(persona.verify(nident))
-
- orig_open = open
- with mock.patch('os.path.expanduser', side_effect=expandusermock) \
- as eu, mock.patch('medashare.cli.open') as op:
- # that when the store fails
- def open_repl(fname, mode):
- #print('or:', repr(fname), repr(mode), file=sys.stderr)
- self.assertIn(mode, ('rb', 'wb'))
-
- if fname == identfname or mode == 'wb':
- return orig_open(fname, mode)
-
- #print('foo:', repr(fname), repr(mode), file=sys.stderr)
- if True: #pragma: no cover
- raise FileNotFoundError
-
- op.side_effect = open_repl
-
- # and there is no store
- with mock.patch('sys.stderr', io.StringIO()) as stderr, mock.patch('sys.argv', [ 'progname', 'list', 'foo', ]) as argv:
- # that it exits
- with self.assertRaises(SystemExit) as cm:
- main()
-
- # with error code 1
- self.assertEqual(cm.exception.code, 1)
-
- # and outputs an error message
- self.assertEqual(stderr.getvalue(),
- 'ERROR: file not found: \'foo\'\n')
-
- # Tests to add:
- # dump mappings (mappings with no args)
- # expand mappings to multiple mappings, that is a -> b, b -> c, implies a -> c
- # support host names in --create
|