#!/usr/bin/env python #import pdb, sys; mypdb = pdb.Pdb(stdout=sys.stderr); mypdb.set_trace() from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey, \ Ed448PublicKey from cryptography.hazmat.primitives.serialization import Encoding, \ PrivateFormat, PublicFormat, NoEncryption import base58 import copy import datetime import functools import hashlib import mock import os.path import pasn1 import shutil import string import tempfile import unittest import uuid from contextlib import nested # The UUID for the namespace representing the path to a file _NAMESPACE_MEDASHARE_PATH = uuid.UUID('f6f36b62-3770-4a68-bc3d-dc3e31e429e6') _defaulthash = 'sha512' _validhashes = set([ 'sha256', 'sha512' ]) _hashlengths = { len(getattr(hashlib, x)().hexdigest()): x for x in _validhashes } def _iterdictlist(obj): itms = obj.items() itms.sort() for k, v in itms: if isinstance(v, list): v = v[:] v.sort() for i in v: yield k, i else: yield k, v def _makeuuid(s): if isinstance(s, uuid.UUID): return s return uuid.UUID(bytes=s) # XXX - known issue, store is not atomic/safe, overwrites in place instead of renames # XXX - add validation # XXX - how to add singletons class MDBase(object): '''This is a simple wrapper that turns a JSON object into a pythonesc object where attribute accesses work.''' _type = 'invalid' _generated_properties = { 'uuid': uuid.uuid4, 'modified': datetime.datetime.utcnow } # When decoding, the decoded value should be passed to this function # to get the correct type _instance_properties = { 'uuid': _makeuuid, 'created_by_ref': _makeuuid, } _common_properties = [ 'type', 'created_by_ref' ] # XXX - add lang? _common_optional = set(('parent_refs', 'sig')) _common_names = set(_common_properties + _generated_properties.keys()) def __init__(self, obj={}, **kwargs): obj = copy.deepcopy(obj) obj.update(kwargs) if self._type == MDBase._type: raise ValueError('call MDBase.create_obj instead so correct class is used.') if 'type' in obj and obj['type'] != self._type: raise ValueError( 'trying to create the wrong type of object, got: %s, expected: %s' % (`obj['type']`, `self._type`)) if 'type' not in obj: obj['type'] = self._type for x in self._common_properties: if x not in obj: raise ValueError('common property %s not present' % `x`) for x, fun in self._instance_properties.iteritems(): if x in obj: obj[x] = fun(obj[x]) for x, fun in self._generated_properties.iteritems(): if x not in obj: obj[x] = fun() self._obj = obj @classmethod def create_obj(cls, obj): '''Using obj as a base, create an instance of MDBase of the correct type. If the correct type is not found, a ValueError is raised.''' if isinstance(obj, cls): obj = obj._obj ty = obj['type'] for i in MDBase.__subclasses__(): if i._type == ty: return i(obj) else: raise ValueError('Unable to find class for type %s' % `ty`) def new_version(self, *args): '''For each k, v pari, add the property k as an additional one (or new one if first), with the value v.''' obj = copy.deepcopy(self._obj) common = self._common_names | self._common_optional for k, v in args: if k in common: obj[k] = v else: obj.setdefault(k, []).append(v) del obj['modified'] return self.create_obj(obj) def __repr__(self): # pragma: no cover return '%s(%s)' % (self.__class__.__name__, `self._obj`) def __getattr__(self, k): try: return self._obj[k] except KeyError: raise AttributeError(k) def __setattr__(self, k, v): if k[0] == '_': # direct attribute self.__dict__[k] = v else: self._obj[k] = v def __getitem__(self, k): return self._obj[k] def __to_dict__(self): return self._obj def __eq__(self, o): return cmp(self._obj, o) == 0 def __contains__(self, k): return k in self._obj def items(self, skipcommon=True): return [ (k, v) for k, v in self._obj.iteritems() if not skipcommon or k not in self._common_names ] def encode(self): return _asn1coder.dumps(self) @classmethod def decode(cls, s): return cls.create_obj(_asn1coder.loads(s)) class MetaData(MDBase): _type = 'metadata' class Identity(MDBase): _type = 'identity' # Identites don't need a created by _common_properties = [ x for x in MDBase._common_properties if x != 'created_by_ref' ] _common_optional = set([ x for x in MDBase._common_optional if x != 'parent_refs' ] + [ 'name', 'pubkey' ]) _common_names = set(_common_properties + MDBase._generated_properties.keys()) def _trytodict(o): if isinstance(o, uuid.UUID): return 'bytes', o.bytes try: return 'dict', o.__to_dict__() except Exception: # pragma: no cover raise TypeError('unable to find __to_dict__ on %s: %s' % (type(o), `o`)) class CanonicalCoder(pasn1.ASN1DictCoder): def enc_dict(self, obj, **kwargs): class FakeIter: def iteritems(self): itms = obj.items() itms.sort() return iter(itms) return pasn1.ASN1DictCoder.enc_dict(self, FakeIter(), **kwargs) _asn1coder = CanonicalCoder(coerce=_trytodict) class Persona(object): '''The object that represents a persona, or identity. It will create the proper identity object, serialize for saving keys, create objects for that persona and other management.''' def __init__(self, identity=None, key=None): if identity is None: self._identity = Identity() else: self._identity = identity self._key = key self._pubkey = None if 'pubkey' in self._identity: pubkeybytes = self._identity.pubkey self._pubkey = Ed448PublicKey.from_public_bytes(pubkeybytes) self._created_by_ref = self._identity.uuid def MetaData(self, *args, **kwargs): kwargs['created_by_ref'] = self.uuid return self.sign(MetaData(*args, **kwargs)) @property def uuid(self): '''Return the UUID of the identity represented.''' return self._identity.uuid def __repr__(self): # pragma: no cover r = '' % \ (self._key is not None, self._pubkey is not None, `self._identity`) return r @classmethod def from_pubkey(cls, pubkeystr): pubstr = base58.b58decode_check(pubkeystr) uuid, pubkey = _asn1coder.loads(pubstr) ident = Identity(uuid=uuid, pubkey=pubkey) return cls(ident) def get_identity(self): '''Return the Identity object for this Persona.''' return self._identity def get_pubkey(self): '''Get a printable version of the public key. This is used for importing into different programs, or for shared.''' idobj = self._identity pubstr = _asn1coder.dumps([ idobj.uuid, idobj.pubkey ]) return base58.b58encode_check(pubstr) def new_version(self, *args): '''Update the Persona's Identity object.''' self._identity = self.sign(self._identity.new_version(*args)) return self._identity def store(self, fname): '''Store the Persona to a file. If there is a private key associated w/ the Persona, it will be saved as well.''' with open(fname, 'w') as fp: obj = { 'identity': self._identity, } if self._key is not None: obj['key'] = \ self._key.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption()) fp.write(_asn1coder.dumps(obj)) @classmethod def load(cls, fname): '''Load the Persona from the provided file.''' with open(fname) as fp: objs = _asn1coder.loads(fp.read()) kwargs = {} if 'key' in objs: kwargs['key'] = Ed448PrivateKey.from_private_bytes(objs['key']) return cls(Identity(objs['identity']), **kwargs) def generate_key(self): '''Generate a key for this Identity. Raises a RuntimeError if a key is already present.''' if self._key: raise RuntimeError('a key already exists') self._key = Ed448PrivateKey.generate() self._pubkey = self._key.public_key() pubkey = self._pubkey.public_bytes(Encoding.Raw, PublicFormat.Raw) self._identity = self.sign(self._identity.new_version(('pubkey', pubkey))) def _makesigbytes(self, obj): obj = dict(obj.items(False)) try: del obj['sig'] except KeyError: pass return _asn1coder.dumps(obj) def sign(self, obj): '''Takes the object, adds a signature, and returns the new object.''' sigbytes = self._makesigbytes(obj) sig = self._key.sign(sigbytes) newobj = MDBase.create_obj(obj) newobj.sig = sig return newobj def verify(self, obj): sigbytes = self._makesigbytes(obj) pubkey = self._pubkey.public_bytes(Encoding.Raw, PublicFormat.Raw) self._pubkey.verify(obj['sig'], sigbytes) return True def by_file(self, fname): '''Return a metadata object for the file named fname.''' fobj = FileObject.from_file(fname, self._created_by_ref) return self.sign(fobj) class ObjectStore(object): '''A container to store for the various Metadata objects.''' # The _uuids property contains both the UUIDv4 for objects, and # looking up the UUIDv5 for FileObjects. def __init__(self, created_by_ref): self._created_by_ref = created_by_ref self._uuids = {} self._hashes = {} @staticmethod def makehash(hashstr, strict=True): '''Take a hash or hash string, and return a valid hash string from it. This makes sure that it is of the correct type and length. If strict is False, the function will detect the length and return a valid hash string if one can be found. By default, the string must be prepended by the type, followed by a colon, followed by the value in hex in all lower case characters.''' try: hash, value = hashstr.split(':') except ValueError: if strict: raise hash = _hashlengths[len(hashstr)] value = hashstr if strict and len(str(value).translate(None, string.hexdigits.lower())) != 0: raise ValueError('value has invalid hex digits (must be lower case)', value) if hash in _validhashes: return ':'.join((hash, value)) raise ValueError def __len__(self): return len(self._uuids) def store(self, fname): '''Write out the objects in the store to the file named fname.''' with open(fname, 'w') as fp: obj = { 'created_by_ref': self._created_by_ref, 'objects': self._uuids.values(), } fp.write(_asn1coder.dumps(obj)) def loadobj(self, obj): '''Load obj into the data store.''' obj = MDBase.create_obj(obj) self._uuids[obj.uuid] = obj for j in obj.hashes: h = self.makehash(j) self._hashes.setdefault(h, []).append(obj) @classmethod def load(cls, fname): '''Load objects from the provided file name. Basic validation will be done on the objects in the file. The objects will be accessible via other methods.''' with open(fname) as fp: objs = _asn1coder.loads(fp.read()) obj = cls(objs['created_by_ref']) for i in objs['objects']: obj.loadobj(i) return obj def by_id(self, id): '''Look up an object by it's UUID.''' if not isinstance(id, uuid.UUID): uid = uuid.UUID(id) else: uid = id return self._uuids[uid] def by_hash(self, hash): '''Look up an object by it's hash value.''' h = self.makehash(hash, strict=False) return self._hashes[h] def by_file(self, fname): '''Return a metadata object for the file named fname.''' fid = FileObject.make_id(fname) try: fobj = self.by_id(fid) except KeyError: # unable to find it fobj = FileObject.from_file(fname, self._created_by_ref) self.loadobj(fobj) for i in fobj.hashes: j = self.by_hash(i) # Filter out non-metadata objects j = [ x for x in j if x.type == 'metadata' ] if j: return j else: raise KeyError('unable to find metadata for file') def _hashfile(fname): hash = getattr(hashlib, _defaulthash)() with open(fname) as fp: r = fp.read() hash.update(r) return '%s:%s' % (_defaulthash, hash.hexdigest()) class FileObject(MDBase): _type = 'file' @staticmethod def make_id(fname): '''Take a local file name, and make the id for it. Note that converts from the local path separator to a forward slash so that it will be the same between Windows and Unix systems.''' fname = os.path.realpath(fname) return uuid.uuid5(_NAMESPACE_MEDASHARE_PATH, '/'.join(os.path.split(fname))) @classmethod def from_file(cls, filename, created_by_ref): s = os.stat(filename) obj = { 'dir': os.path.dirname(filename), 'created_by_ref': created_by_ref, 'filename': os.path.basename(filename), 'id': cls.make_id(filename), 'mtime': datetime.datetime.utcfromtimestamp(s.st_mtime), 'size': s.st_size, 'hashes': [ _hashfile(filename), ], } return cls(obj) def enumeratedir(_dir, created_by_ref): '''Enumerate all the files and directories (not recursive) in _dir. Returned is a list of FileObjects.''' return map(lambda x: FileObject.from_file(os.path.join(_dir, x), created_by_ref), os.listdir(_dir)) def main(): from optparse import OptionParser import sys parser = OptionParser() parser.add_option('-a', action='append', dest='add', default=[], help='add the arg as metadata for files, tag=value') parser.add_option('-d', action='append', dest='delete', default=[], help='delete the arg as metadata from files. Either specify tag, and all tags are removed, or specify tag=value and that specific tag will be removed.') parser.add_option('-g', action='store_true', dest='generateident', default=False, help='generate an identity') parser.add_option('-i', action='store_true', dest='updateident', default=False, help='update the identity') parser.add_option('-l', action='store_true', dest='list', default=False, help='list metadata') parser.add_option('-p', action='store_true', dest='printpub', default=False, help='Print the public key of the identity') options, args = parser.parse_args() # this is shared between generateident and add addprops = map(lambda x: x.split('=', 1), options.add) if options.generateident or options.updateident or options.printpub: identfname = os.path.expanduser('~/.medashare_identity.pasn1') if options.generateident and os.path.exists(identfname): print >>sys.stderr, 'Error: Identity already created.' sys.exit(1) if options.generateident: persona = Persona() persona.generate_key() else: persona = Persona.load(identfname) if options.printpub: print persona.get_pubkey() return persona.new_version(*addprops) persona.store(identfname) return storefname = os.path.expanduser('~/.medashare_store.pasn1') import sys #print >>sys.stderr, `storefname` objstr = ObjectStore.load(storefname) if options.list: for i in args: for j in objstr.by_file(i): #print >>sys.stderr, `j._obj` for k, v in _iterdictlist(j): print '%s:\t%s' % (k, v) elif options.add: for i in args: for j in objstr.by_file(i): nobj = j.new_version(*addprops) objstr.loadobj(nobj) elif options.delete: for i in args: for j in objstr.by_file(i): obj = j.__to_dict__() for k in options.delete: try: key, v = k.split('=', 1) obj[key].remove(v) except ValueError: del obj[k] nobj = MDBase.create_obj(obj) objstr.loadobj(nobj) else: # pragma: no cover raise NotImplementedError objstr.store(storefname) if __name__ == '__main__': # pragma: no cover main() class _TestCononicalCoder(unittest.TestCase): def test_con(self): obja = { 'foo': 23984732, 'a': 5, 'b': 6, 'something': '2398472398723498273dfasdfjlaksdfj' } objaitems = obja.items() objaitems.sort() objb = dict(objaitems) self.assertEqual(obja, objb) # This is to make sure that item order changed self.assertNotEqual(obja.items(), objb.items()) astr = pasn1.dumps(obja) bstr = pasn1.dumps(objb) self.assertNotEqual(astr, bstr) astr = _asn1coder.dumps(obja) bstr = _asn1coder.dumps(objb) self.assertEqual(astr, bstr) class _TestCases(unittest.TestCase): def setUp(self): d = os.path.realpath(tempfile.mkdtemp()) self.basetempdir = d self.tempdir = os.path.join(d, 'subdir') persona = Persona.load(os.path.join('fixtures', 'sample.persona.pasn1')) self.created_by_ref = persona.get_identity().uuid shutil.copytree(os.path.join('fixtures', 'testfiles'), self.tempdir) def tearDown(self): shutil.rmtree(self.basetempdir) self.tempdir = None def test_mdbase(self): self.assertRaises(ValueError, MDBase, created_by_ref='') self.assertRaises(ValueError, MDBase.create_obj, { 'type': 'unknosldkfj' }) self.assertRaises(ValueError, MDBase.create_obj, { 'type': 'metadata' }) baseobj = { 'type': 'metadata', 'created_by_ref': self.created_by_ref, } origbase = copy.deepcopy(baseobj) # that when an MDBase object is created md = MDBase.create_obj(baseobj) # it doesn't modify the passed in object (when adding # generated properties) self.assertEqual(baseobj, origbase) # and it has the generted properties # Note: cannot mock the functions as they are already # referenced at creation time self.assertIn('uuid', md) self.assertIn('modified', md) # That you can create a new version using new_version md2 = md.new_version(('dc:creator', 'Jim Bob',)) # that they are different self.assertNotEqual(md, md2) # and that the new modified time is different from the old self.assertNotEqual(md.modified, md2.modified) # and that the modification is present self.assertEqual(md2['dc:creator'], [ 'Jim Bob' ]) # that providing a value from common property fvalue = 'fakesig' md3 = md.new_version(('sig', fvalue)) # gets set directly, and is not a list self.assertEqual(md3.sig, fvalue) # that invalid attribute access raises correct exception self.assertRaises(AttributeError, getattr, md, 'somerandombogusattribute') def test_mdbase_encode_decode(self): # that an object baseobj = { 'type': 'metadata', 'created_by_ref': self.created_by_ref, } obj = MDBase.create_obj(baseobj) # can be encoded coded = obj.encode() # and that the rsults can be decoded decobj = MDBase.decode(coded) # and that they are equal self.assertEqual(obj, decobj) # and in the encoded object eobj = _asn1coder.loads(coded) # the uuid property is a str instance self.assertIsInstance(eobj['uuid'], str) # and has the length of 16 self.assertEqual(len(eobj['uuid']), 16) def test_mdbase_wrong_type(self): # that created_by_ref can be passed by kw obj = MetaData(created_by_ref=self.created_by_ref) self.assertRaises(ValueError, FileObject, dict(obj.items(False))) def test_makehash(self): self.assertRaises(ValueError, ObjectStore.makehash, 'slkj') self.assertRaises(ValueError, ObjectStore.makehash, 'sha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ADA') self.assertRaises(ValueError, ObjectStore.makehash, 'bogushash:9e0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ADA', strict=False) self.assertEqual(ObjectStore.makehash('cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e', strict=False), 'sha512:cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e') self.assertEqual(ObjectStore.makehash('e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', strict=False), 'sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855') def test_enumeratedir(self): files = enumeratedir(self.tempdir, self.created_by_ref) ftest = files[0] fname = 'test.txt' # make sure that they are of type MDBase self.assertIsInstance(ftest, MDBase) oldid = ftest.id self.assertEqual(ftest.filename, fname) self.assertEqual(ftest.dir, self.tempdir) # XXX - do we add host information? self.assertEqual(ftest.id, uuid.uuid5(_NAMESPACE_MEDASHARE_PATH, '/'.join(os.path.split(self.tempdir) + ( fname, )))) self.assertEqual(ftest.mtime, datetime.datetime(2019, 5, 20, 21, 47, 36)) self.assertEqual(ftest.size, 15) self.assertIn('sha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f', ftest.hashes) # XXX - make sure works w/ relative dirs files = enumeratedir(os.path.relpath(self.tempdir), self.created_by_ref) self.assertEqual(oldid, files[0].id) def test_mdbaseoverlay(self): objst = ObjectStore(self.created_by_ref) # that a base object bid = uuid.uuid4() objst.loadobj({ 'type': 'metadata', 'uuid': bid, 'modified': datetime.datetime(2019, 6, 10, 14, 3, 10), 'created_by_ref': self.created_by_ref, 'hashes': [ 'sha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada' ], 'someprop': [ 'somevalue' ], 'lang': 'en', }) # can have an overlay object oid = uuid.uuid4() dhash = 'sha256:a7c96262c21db9a06fd49e307d694fd95f624569f9b35bb3ffacd880440f9787' objst.loadobj({ 'type': 'metadata', 'uuid': oid, 'modified': datetime.datetime(2019, 6, 10, 18, 3, 10), 'created_by_ref': self.created_by_ref, 'hashes': [ dhash ], 'parent_refs': [ bid ], 'lang': 'en', }) # and that when you get it's properties oobj = objst.by_id(oid) odict = dict(oobj.items()) # that is has the overlays property self.assertEqual(odict['parent_refs'], [ bid ]) # that it doesn't have a common property self.assertNotIn('type', odict) # that when skipcommon is False odict = dict(oobj.items(False)) # that it does have a common property self.assertIn('type', odict) def test_persona(self): # that a newly created persona persona = Persona() # has an identity object idobj = persona.get_identity() # and that it has a uuid attribute that matches self.assertEqual(persona.uuid, idobj['uuid']) # that a key can be generated persona.generate_key() # that the pubkey property is present idobj = persona.get_identity() self.assertIsInstance(idobj['pubkey'], str) # that get_pubkey returns the correct thing pubstr = _asn1coder.dumps([ idobj.uuid, idobj['pubkey'] ]) self.assertEqual(persona.get_pubkey(), base58.b58encode_check(pubstr)) # and that there is a signature self.assertIsInstance(idobj['sig'], str) # and that it can verify itself persona.verify(idobj) # and that a new persona can be created from the pubkey pkpersona = Persona.from_pubkey(persona.get_pubkey()) # and that it can verify the old identity self.assertTrue(pkpersona.verify(idobj)) # that a second time, it raises an exception self.assertRaises(RuntimeError, persona.generate_key) # that a file object created by it testfname = os.path.join(self.tempdir, 'test.txt') testobj = persona.by_file(testfname) # has the correct created_by_ref self.assertEqual(testobj.created_by_ref, idobj.uuid) # and has a signature self.assertIn('sig', testobj) # that a persona created from the identity object vpersona = Persona(idobj) # can verify the sig self.assertTrue(vpersona.verify(testobj)) # and that a bogus signature bogussig = 'somebogussig' bogusobj = MDBase.create_obj(testobj) bogusobj.sig = bogussig # fails to verify self.assertRaises(Exception, vpersona.verify, bogusobj) # and that a modified object otherobj = testobj.new_version(('customprop', 'value')) # fails to verify self.assertRaises(Exception, vpersona.verify, otherobj) # that a persona object can be written perpath = os.path.join(self.basetempdir, 'persona.pasn1') persona.store(perpath) # and that when loaded back loadpersona = Persona.load(perpath) # the new persona object can sign an object nvtestobj = loadpersona.sign(testobj.new_version()) # and the old persona can verify it. self.assertTrue(vpersona.verify(nvtestobj)) def test_persona_metadata(self): # that a persona persona = Persona() persona.generate_key() # can create a metadata object hashobj = ['asdlfkj'] mdobj = persona.MetaData(hashes=hashobj) # that the object has the correct created_by_ref self.assertEqual(mdobj.created_by_ref, persona.uuid) # and has the provided hashes self.assertEqual(mdobj.hashes, hashobj) # and that it can be verified persona.verify(mdobj) def test_objectstore(self): objst = ObjectStore.load(os.path.join('fixtures', 'sample.data.pasn1')) objst.loadobj({ 'type': 'metadata', 'uuid': uuid.UUID('c9a1d1e2-3109-4efd-8948-577dc15e44e7'), 'modified': datetime.datetime(2019, 5, 31, 14, 3, 10), 'created_by_ref': self.created_by_ref, 'hashes': [ 'sha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada' ], 'lang': 'en', }) lst = objst.by_hash('91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada') self.assertEqual(len(lst), 2) byid = objst.by_id('3e466e06-45de-4ecc-84ba-2d2a3d970e96') self.assertIsInstance(byid, MetaData) self.assertIn(byid, lst) r = byid self.assertEqual(r.uuid, uuid.UUID('3e466e06-45de-4ecc-84ba-2d2a3d970e96')) self.assertEqual(r['dc:creator'], [ u'John-Mark Gurney' ]) fname = 'testfile.pasn1' objst.store(fname) with open(fname) as fp: objs = _asn1coder.loads(fp.read()) os.unlink(fname) self.assertEqual(len(objs), len(objst)) self.assertEqual(objs['created_by_ref'], self.created_by_ref.bytes) for i in objs['objects']: i['created_by_ref'] = uuid.UUID(bytes=i['created_by_ref']) i['uuid'] = uuid.UUID(bytes=i['uuid']) self.assertEqual(objst.by_id(i['uuid']), i) testfname = os.path.join(self.tempdir, 'test.txt') self.assertEqual(objst.by_file(testfname), [ byid ]) self.assertEqual(objst.by_file(testfname), [ byid ]) self.assertRaises(KeyError, objst.by_file, '/dev/null') # XXX make sure that object store contains fileobject # Tests to add: # Non-duplicates when same metadata is located by multiple hashes. def test_main(self): # Test the main runner, this is only testing things that are # specific to running the program, like where the store is # created. # setup object store storefname = os.path.join(self.tempdir, 'storefname') identfname = os.path.join(self.tempdir, 'identfname') shutil.copy(os.path.join('fixtures', 'sample.data.pasn1'), storefname) # setup path mapping def expandusermock(arg): if arg == '~/.medashare_store.pasn1': return storefname elif arg == '~/.medashare_identity.pasn1': return identfname # setup test fname testfname = os.path.join(self.tempdir, 'test.txt') import sys import StringIO import itertools with mock.patch('os.path.expanduser', side_effect=expandusermock) \ as eu: # that generating a new identity with nested(mock.patch('sys.stdout', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-g', '-a', 'name=A Test User' ])) as (stdout, argv): main() # does not output anything self.assertEqual(stdout.getvalue(), '') # looks up the correct file eu.assert_called_with('~/.medashare_identity.pasn1') # and that the identity persona = Persona.load(identfname) pident = persona.get_identity() # has the correct name self.assertEqual(pident.name, 'A Test User') # that when generating an identity when one already exists with nested(mock.patch('sys.stderr', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-g', '-a', 'name=A Test User' ])) as (stderr, argv): # that it exits with self.assertRaises(SystemExit) as cm: main() # with error code 1 self.assertEqual(cm.exception[0], 1) # and outputs an error message self.assertEqual(stderr.getvalue(), 'Error: Identity already created.\n') # and looked up the correct file eu.assert_called_with('~/.medashare_identity.pasn1') # that when updating the identity with nested(mock.patch('sys.stdout', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-i', '-a', 'name=Changed Name' ])) as (stdout, argv): main() # it doesn't output anything self.assertEqual(stdout.getvalue(), '') # and looked up the correct file eu.assert_called_with('~/.medashare_identity.pasn1') npersona = Persona.load(identfname) nident = npersona.get_identity() # and has the new name self.assertEqual(nident.name, 'Changed Name') # and has the same old uuid self.assertEqual(nident.uuid, pident.uuid) # and that the modified date has changed self.assertNotEqual(pident.modified, nident.modified) # and that the old Persona can verify the new one self.assertTrue(persona.verify(nident)) # that when asked to print the public key with nested(mock.patch('sys.stdout', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-p' ])) as (stdout, argv): main() # the correct key is printed self.assertEqual(stdout.getvalue(), '%s\n' % persona.get_pubkey()) # and looked up the correct file eu.assert_called_with('~/.medashare_identity.pasn1') with nested(mock.patch('sys.stdout', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-l', testfname ])) as (stdout, argv): main() self.assertEqual(stdout.getvalue(), 'dc:creator:\tJohn-Mark Gurney\nhashes:\tsha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada\nhashes:\tsha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f\nlang:\ten\n') eu.assert_called_with('~/.medashare_store.pasn1') with nested(mock.patch('sys.stdout', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-a', 'dc:creator=Another user', '-a', 'foo=bar=baz', testfname ])) as (stdout, argv): main() with nested(mock.patch('sys.stdout', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-l', testfname ])) as (stdout, argv): main() self.assertEqual(stdout.getvalue(), 'dc:creator:\tAnother user\ndc:creator:\tJohn-Mark Gurney\nfoo:\tbar=baz\nhashes:\tsha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada\nhashes:\tsha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f\nlang:\ten\n') with nested(mock.patch('sys.stdout', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-d', 'dc:creator', testfname ])) as (stdout, argv): main() with nested(mock.patch('sys.stdout', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-l', testfname ])) as (stdout, argv): main() self.assertEqual(stdout.getvalue(), 'foo:\tbar=baz\nhashes:\tsha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada\nhashes:\tsha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f\nlang:\ten\n') with nested(mock.patch('sys.stdout', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-a', 'foo=bleh', testfname ])) as (stdout, argv): main() with nested(mock.patch('sys.stdout', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-l', testfname ])) as (stdout, argv): main() self.assertEqual(stdout.getvalue(), 'foo:\tbar=baz\nfoo:\tbleh\nhashes:\tsha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada\nhashes:\tsha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f\nlang:\ten\n') with nested(mock.patch('sys.stdout', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-d', 'foo=bar=baz', testfname ])) as (stdout, argv): main() with nested(mock.patch('sys.stdout', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-l', testfname ])) as (stdout, argv): main() self.assertEqual(stdout.getvalue(), 'foo:\tbleh\nhashes:\tsha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada\nhashes:\tsha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f\nlang:\ten\n')