#!/usr/bin/env python #import pdb, sys; mypdb = pdb.Pdb(stdout=sys.stderr); mypdb.set_trace() from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey, \ Ed448PublicKey from cryptography.hazmat.primitives.serialization import Encoding, \ PrivateFormat, PublicFormat, NoEncryption import base58 import copy import datetime import hashlib import mock import os.path import pasn1 import shutil import string import tempfile import unittest import uuid from contextlib import nested # The UUID for the namespace representing the path to a file _NAMESPACE_MEDASHARE_PATH = uuid.UUID('f6f36b62-3770-4a68-bc3d-dc3e31e429e6') _defaulthash = 'sha512' _validhashes = set([ 'sha256', 'sha512' ]) _hashlengths = { len(getattr(hashlib, x)().hexdigest()): x for x in _validhashes } def _iterdictlist(obj): itms = obj.items() itms.sort() for k, v in itms: if isinstance(v, list): v = v[:] v.sort() for i in v: yield k, i else: yield k, v def _makeuuid(s): if isinstance(s, uuid.UUID): return s return uuid.UUID(bytes=s) # XXX - known issue, store is not atomic/safe, overwrites in place instead of renames # XXX - add validation # XXX - how to add singletons class MDBase(object): '''This is a simple wrapper that turns a JSON object into a pythonesc object where attribute accesses work.''' _type = 'invalid' _generated_properties = { 'uuid': uuid.uuid4, 'modified': datetime.datetime.utcnow } # When decoding, the decoded value should be passed to this function # to get the correct type _instance_properties = { 'uuid': _makeuuid, 'created_by_ref': _makeuuid, } _common_properties = [ 'type', 'created_by_ref' ] # XXX - add lang? _common_optional = set(('overlay_refs', 'sig')) _common_names = set(_common_properties + _generated_properties.keys()) def __init__(self, obj={}, **kwargs): obj = copy.deepcopy(obj) obj.update(kwargs) if self._type == MDBase._type: raise ValueError('call MDBase.create_obj instead so correct class is used.') if 'type' in obj and obj['type'] != self._type: raise ValueError( 'trying to create the wrong type of object, got: %s, expected: %s' % (`obj['type']`, `self._type`)) if 'type' not in obj: obj['type'] = self._type for x in self._common_properties: if x not in obj: raise ValueError('common property %s not present' % `x`) for x, fun in self._instance_properties.iteritems(): if x in obj: obj[x] = fun(obj[x]) for x, fun in self._generated_properties.iteritems(): if x not in obj: obj[x] = fun() self._obj = obj @classmethod def create_obj(cls, obj): '''Using obj as a base, create an instance of MDBase of the correct type. If the correct type is not found, a ValueError is raised.''' if isinstance(obj, cls): obj = obj._obj ty = obj['type'] for i in MDBase.__subclasses__(): if i._type == ty: return i(obj) else: raise ValueError('Unable to find class for type %s' % `ty`) def new_version(self, *args): '''For each k, v pari, add the property k as an additional one (or new one if first), with the value v.''' obj = copy.deepcopy(self._obj) common = self._common_names | self._common_optional for k, v in args: if k in common: obj[k] = v else: obj.setdefault(k, []).append(v) del obj['modified'] return self.create_obj(obj) def __repr__(self): # pragma: no cover return '%s(%s)' % (self.__class__.__name__, `self._obj`) def __getattr__(self, k): return self._obj[k] def __setattr__(self, k, v): if k[0] == '_': # direct attribute self.__dict__[k] = v else: self._obj[k] = v def __getitem__(self, k): return self._obj[k] def __to_dict__(self): return self._obj def __eq__(self, o): return cmp(self._obj, o) == 0 def __contains__(self, k): return k in self._obj def items(self, skipcommon=True): return [ (k, v) for k, v in self._obj.iteritems() if not skipcommon or k not in self._common_names ] def encode(self): return _asn1coder.dumps(self) @classmethod def decode(cls, s): return cls.create_obj(_asn1coder.loads(s)) class MetaData(MDBase): _type = 'metadata' class Identity(MDBase): _type = 'identity' # Identites don't need a created by _common_properties = [ x for x in MDBase._common_properties if x != 'created_by_ref' ] _common_optional = set([ x for x in MDBase._common_optional if x != 'overlay_refs' ] + [ 'name', 'pubkey' ]) _common_names = set(_common_properties + MDBase._generated_properties.keys()) def _trytodict(o): if isinstance(o, uuid.UUID): return 'bytes', o.bytes try: return 'dict', o.__to_dict__() except Exception: # pragma: no cover raise TypeError('unable to find __to_dict__ on %s: %s' % (type(o), `o`)) class CanonicalCoder(pasn1.ASN1DictCoder): def enc_dict(self, obj, **kwargs): class FakeIter: def iteritems(self): itms = obj.items() itms.sort() return iter(itms) return pasn1.ASN1DictCoder.enc_dict(self, FakeIter(), **kwargs) _asn1coder = CanonicalCoder(coerce=_trytodict) class Persona(object): '''The object that represents a persona, or identity. It will create the proper identity object, serialize for saving keys, create objects for that persona and other management.''' def __init__(self, identity=None, key=None): if identity is None: self._identity = Identity() else: self._identity = identity self._key = key self._pubkey = None if 'pubkey' in self._identity: pubkeybytes = self._identity.pubkey self._pubkey = Ed448PublicKey.from_public_bytes(pubkeybytes) self._created_by_ref = self._identity.uuid @property def uuid(self): '''Return the UUID of the identity represented.''' return self._identity.uuid def __repr__(self): # pragma: no cover r = '' % \ (self._key is not None, self._pubkey is not None, `self._identity`) return r @classmethod def from_pubkey(cls, pubkeystr): pubstr = base58.b58decode_check(pubkeystr) uuid, pubkey = _asn1coder.loads(pubstr) ident = Identity(uuid=uuid, pubkey=pubkey) return cls(ident) def get_identity(self): '''Return the Identity object for this Persona.''' return self._identity def get_pubkey(self): '''Get a printable version of the public key. This is used for importing into different programs, or for shared.''' idobj = self._identity pubstr = _asn1coder.dumps([ idobj.uuid, idobj.pubkey ]) return base58.b58encode_check(pubstr) def new_version(self, *args): '''Update the Persona's Identity object.''' self._identity = self.sign(self._identity.new_version(*args)) return self._identity def store(self, fname): '''Store the Persona to a file. If there is a private key associated w/ the Persona, it will be saved as well.''' with open(fname, 'w') as fp: obj = { 'identity': self._identity, } if self._key is not None: obj['key'] = \ self._key.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption()) fp.write(_asn1coder.dumps(obj)) @classmethod def load(cls, fname): '''Load the Persona from the provided file.''' with open(fname) as fp: objs = _asn1coder.loads(fp.read()) kwargs = {} if 'key' in objs: kwargs['key'] = Ed448PrivateKey.from_private_bytes(objs['key']) return cls(Identity(objs['identity']), **kwargs) def generate_key(self): '''Generate a key for this Identity. Raises a RuntimeError if a key is already present.''' if self._key: raise RuntimeError('a key already exists') self._key = Ed448PrivateKey.generate() self._pubkey = self._key.public_key() pubkey = self._pubkey.public_bytes(Encoding.Raw, PublicFormat.Raw) self._identity = self.sign(self._identity.new_version(('pubkey', pubkey))) def _makesigbytes(self, obj): obj = dict(obj.items(False)) try: del obj['sig'] except KeyError: pass return _asn1coder.dumps(obj) def sign(self, obj): '''Takes the object, adds a signature, and returns the new object.''' sigbytes = self._makesigbytes(obj) sig = self._key.sign(sigbytes) newobj = MDBase.create_obj(obj) newobj.sig = sig return newobj def verify(self, obj): sigbytes = self._makesigbytes(obj) pubkey = self._pubkey.public_bytes(Encoding.Raw, PublicFormat.Raw) self._pubkey.verify(obj['sig'], sigbytes) return True def by_file(self, fname): '''Return a metadata object for the file named fname.''' fobj = FileObject.from_file(fname, self._created_by_ref) return self.sign(fobj) class ObjectStore(object): '''A container to store for the various Metadata objects.''' # The _uuids property contains both the UUIDv4 for objects, and # looking up the UUIDv5 for FileObjects. def __init__(self, created_by_ref): self._created_by_ref = created_by_ref self._uuids = {} self._hashes = {} @staticmethod def makehash(hashstr, strict=True): '''Take a hash string, and return a valid hash string from it. This makes sure that it is of the correct type and length. If strict is False, the function will detect the length and return a valid hash if one can be found.''' try: hash, value = hashstr.split(':') except ValueError: if strict: raise hash = _hashlengths[len(hashstr)] value = hashstr if strict and len(str(value).translate(None, string.hexdigits.lower())) != 0: raise ValueError('value has invalid hex digits (must be lower case)', value) if hash in _validhashes: return ':'.join((hash, value)) raise ValueError def __len__(self): return len(self._uuids) def store(self, fname): '''Write out the objects in the store to the file named fname.''' with open(fname, 'w') as fp: obj = { 'created_by_ref': self._created_by_ref, 'objects': self._uuids.values(), } fp.write(_asn1coder.dumps(obj)) def loadobj(self, obj): '''Load obj into the data store.''' obj = MDBase.create_obj(obj) self._uuids[obj.uuid] = obj for j in obj.hashes: h = self.makehash(j) self._hashes.setdefault(h, []).append(obj) @classmethod def load(cls, fname): '''Load objects from the provided file name. Basic validation will be done on the objects in the file. The objects will be accessible via other methods.''' with open(fname) as fp: objs = _asn1coder.loads(fp.read()) obj = cls(objs['created_by_ref']) for i in objs['objects']: obj.loadobj(i) return obj def by_id(self, id): '''Look up an object by it's UUID.''' if not isinstance(id, uuid.UUID): uid = uuid.UUID(id) else: uid = id return self._uuids[uid] def by_hash(self, hash): '''Look up an object by it's hash value.''' h = self.makehash(hash, strict=False) return self._hashes[h] def by_file(self, fname): '''Return a metadata object for the file named fname.''' fid = FileObject.make_id(fname) try: fobj = self.by_id(fid) except KeyError: # unable to find it fobj = FileObject.from_file(fname, self._created_by_ref) self.loadobj(fobj) for i in fobj.hashes: j = self.by_hash(i) # Filter out non-metadata objects j = [ x for x in j if x.type == 'metadata' ] if j: return j else: raise KeyError('unable to find metadata for file') def _hashfile(fname): hash = getattr(hashlib, _defaulthash)() with open(fname) as fp: r = fp.read() hash.update(r) return '%s:%s' % (_defaulthash, hash.hexdigest()) class FileObject(MDBase): _type = 'file' @staticmethod def make_id(fname): '''Take a local file name, and make the id for it. Note that converts from the local path separator to a forward slash so that it will be the same between Windows and Unix systems.''' fname = os.path.realpath(fname) return uuid.uuid5(_NAMESPACE_MEDASHARE_PATH, '/'.join(os.path.split(fname))) @classmethod def from_file(cls, filename, created_by_ref): s = os.stat(filename) obj = { 'dir': os.path.dirname(filename), 'created_by_ref': created_by_ref, 'filename': os.path.basename(filename), 'id': cls.make_id(filename), 'mtime': datetime.datetime.utcfromtimestamp(s.st_mtime), 'size': s.st_size, 'hashes': [ _hashfile(filename), ], } return cls(obj) def enumeratedir(_dir, created_by_ref): '''Enumerate all the files and directories (not recursive) in _dir. Returned is a list of FileObjects.''' return map(lambda x: FileObject.from_file(os.path.join(_dir, x), created_by_ref), os.listdir(_dir)) def main(): from optparse import OptionParser import sys parser = OptionParser() parser.add_option('-a', action='append', dest='add', default=[], help='add the arg as metadata for files, tag=value') parser.add_option('-d', action='append', dest='delete', default=[], help='delete the arg as metadata from files. Either specify tag, and all tags are removed, or specify tag=value and that specific tag will be removed.') parser.add_option('-g', action='store_true', dest='generateident', default=False, help='generate an identity') parser.add_option('-i', action='store_true', dest='updateident', default=False, help='update the identity') parser.add_option('-l', action='store_true', dest='list', default=False, help='list metadata') parser.add_option('-p', action='store_true', dest='printpub', default=False, help='Print the public key of the identity') options, args = parser.parse_args() # this is shared between generateident and add addprops = map(lambda x: x.split('=', 1), options.add) if options.generateident or options.updateident or options.printpub: identfname = os.path.expanduser('~/.medashare_identity.pasn1') if options.generateident and os.path.exists(identfname): print >>sys.stderr, 'Error: Identity already created.' sys.exit(1) if options.generateident: persona = Persona() persona.generate_key() else: persona = Persona.load(identfname) if options.printpub: print persona.get_pubkey() return persona.new_version(*addprops) persona.store(identfname) return storefname = os.path.expanduser('~/.medashare_store.pasn1') import sys #print >>sys.stderr, `storefname` objstr = ObjectStore.load(storefname) if options.list: for i in args: for j in objstr.by_file(i): #print >>sys.stderr, `j._obj` for k, v in _iterdictlist(j): print '%s:\t%s' % (k, v) elif options.add: for i in args: for j in objstr.by_file(i): nobj = j.new_version(*addprops) objstr.loadobj(nobj) elif options.delete: for i in args: for j in objstr.by_file(i): obj = j.__to_dict__() for k in options.delete: try: key, v = k.split('=', 1) obj[key].remove(v) except ValueError: del obj[k] nobj = MDBase.create_obj(obj) objstr.loadobj(nobj) else: # pragma: no cover raise NotImplementedError objstr.store(storefname) if __name__ == '__main__': # pragma: no cover main() class _TestCononicalCoder(unittest.TestCase): def test_con(self): obja = { 'foo': 23984732, 'a': 5, 'b': 6, 'something': '2398472398723498273dfasdfjlaksdfj' } objaitems = obja.items() objaitems.sort() objb = dict(objaitems) self.assertEqual(obja, objb) # This is to make sure that item order changed self.assertNotEqual(obja.items(), objb.items()) astr = pasn1.dumps(obja) bstr = pasn1.dumps(objb) self.assertNotEqual(astr, bstr) astr = _asn1coder.dumps(obja) bstr = _asn1coder.dumps(objb) self.assertEqual(astr, bstr) class _TestCases(unittest.TestCase): def setUp(self): d = os.path.realpath(tempfile.mkdtemp()) self.basetempdir = d self.tempdir = os.path.join(d, 'subdir') persona = Persona.load(os.path.join('fixtures', 'sample.persona.pasn1')) self.created_by_ref = persona.get_identity().uuid shutil.copytree(os.path.join('fixtures', 'testfiles'), self.tempdir) def tearDown(self): shutil.rmtree(self.basetempdir) self.tempdir = None def test_mdbase(self): self.assertRaises(ValueError, MDBase, created_by_ref='') self.assertRaises(ValueError, MDBase.create_obj, { 'type': 'unknosldkfj' }) self.assertRaises(ValueError, MDBase.create_obj, { 'type': 'metadata' }) baseobj = { 'type': 'metadata', 'created_by_ref': self.created_by_ref, } origbase = copy.deepcopy(baseobj) # that when an MDBase object is created md = MDBase.create_obj(baseobj) # it doesn't modify the passed in object (when adding # generated properties) self.assertEqual(baseobj, origbase) # and it has the generted properties # Note: cannot mock the functions as they are already # referenced at creation time self.assertIn('uuid', md) self.assertIn('modified', md) # That you can create a new version using new_version md2 = md.new_version(('dc:creator', 'Jim Bob',)) # that they are different self.assertNotEqual(md, md2) # and that the new modified time is different from the old self.assertNotEqual(md.modified, md2.modified) # and that the modification is present self.assertEqual(md2['dc:creator'], [ 'Jim Bob' ]) # that providing a value from common property fvalue = 'fakesig' md3 = md.new_version(('sig', fvalue)) # gets set directly, and is not a list self.assertEqual(md3.sig, fvalue) def test_mdbase_encode_decode(self): # that an object baseobj = { 'type': 'metadata', 'created_by_ref': self.created_by_ref, } obj = MDBase.create_obj(baseobj) # can be encoded coded = obj.encode() # and that the rsults can be decoded decobj = MDBase.decode(coded) # and that they are equal self.assertEqual(obj, decobj) # and in the encoded object eobj = _asn1coder.loads(coded) # the uuid property is a str instance self.assertIsInstance(eobj['uuid'], str) # and has the length of 16 self.assertEqual(len(eobj['uuid']), 16) def test_mdbase_wrong_type(self): # that created_by_ref can be passed by kw obj = MetaData(created_by_ref=self.created_by_ref) self.assertRaises(ValueError, FileObject, dict(obj.items(False))) def test_makehash(self): self.assertRaises(ValueError, ObjectStore.makehash, 'slkj') self.assertRaises(ValueError, ObjectStore.makehash, 'sha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ADA') self.assertRaises(ValueError, ObjectStore.makehash, 'bogushash:9e0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ADA', strict=False) self.assertEqual(ObjectStore.makehash('cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e', strict=False), 'sha512:cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e') self.assertEqual(ObjectStore.makehash('e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', strict=False), 'sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855') def test_enumeratedir(self): files = enumeratedir(self.tempdir, self.created_by_ref) ftest = files[0] fname = 'test.txt' # make sure that they are of type MDBase self.assertIsInstance(ftest, MDBase) oldid = ftest.id self.assertEqual(ftest.filename, fname) self.assertEqual(ftest.dir, self.tempdir) # XXX - do we add host information? self.assertEqual(ftest.id, uuid.uuid5(_NAMESPACE_MEDASHARE_PATH, '/'.join(os.path.split(self.tempdir) + ( fname, )))) self.assertEqual(ftest.mtime, datetime.datetime(2019, 5, 20, 21, 47, 36)) self.assertEqual(ftest.size, 15) self.assertIn('sha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f', ftest.hashes) # XXX - make sure works w/ relative dirs files = enumeratedir(os.path.relpath(self.tempdir), self.created_by_ref) self.assertEqual(oldid, files[0].id) def test_mdbaseoverlay(self): objst = ObjectStore(self.created_by_ref) # that a base object bid = uuid.uuid4() objst.loadobj({ 'type': 'metadata', 'uuid': bid, 'modified': datetime.datetime(2019, 6, 10, 14, 3, 10), 'created_by_ref': self.created_by_ref, 'hashes': [ 'sha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada' ], 'someprop': [ 'somevalue' ], 'lang': 'en', }) # can have an overlay object oid = uuid.uuid4() dhash = 'sha256:a7c96262c21db9a06fd49e307d694fd95f624569f9b35bb3ffacd880440f9787' objst.loadobj({ 'type': 'metadata', 'uuid': oid, 'modified': datetime.datetime(2019, 6, 10, 18, 3, 10), 'created_by_ref': self.created_by_ref, 'hashes': [ dhash ], 'overlay_refs': [ bid ], 'lang': 'en', }) # and that when you get it's properties oobj = objst.by_id(oid) odict = dict(oobj.items()) # that is has the overlays property self.assertEqual(odict['overlay_refs'], [ bid ]) # that it doesn't have a common property self.assertNotIn('type', odict) # that when skipcommon is False odict = dict(oobj.items(False)) # that it does have a common property self.assertIn('type', odict) def test_persona(self): # that a newly created persona persona = Persona() # has an identity object idobj = persona.get_identity() # and that it has a uuid attribute that matches self.assertEqual(persona.uuid, idobj['uuid']) # that a key can be generated persona.generate_key() # that the pubkey property is present idobj = persona.get_identity() self.assertIsInstance(idobj['pubkey'], str) # that get_pubkey returns the correct thing pubstr = _asn1coder.dumps([ idobj.uuid, idobj['pubkey'] ]) self.assertEqual(persona.get_pubkey(), base58.b58encode_check(pubstr)) # and that there is a signature self.assertIsInstance(idobj['sig'], str) # and that it can verify itself persona.verify(idobj) # and that a new persona can be created from the pubkey pkpersona = Persona.from_pubkey(persona.get_pubkey()) # and that it can verify the old identity self.assertTrue(pkpersona.verify(idobj)) # that a second time, it raises an exception self.assertRaises(RuntimeError, persona.generate_key) # that a file object created by it testfname = os.path.join(self.tempdir, 'test.txt') testobj = persona.by_file(testfname) # has the correct created_by_ref self.assertEqual(testobj.created_by_ref, idobj.uuid) # and has a signature self.assertIn('sig', testobj) # that a persona created from the identity object vpersona = Persona(idobj) # can verify the sig self.assertTrue(vpersona.verify(testobj)) # and that a bogus signature bogussig = 'somebogussig' bogusobj = MDBase.create_obj(testobj) bogusobj.sig = bogussig # fails to verify self.assertRaises(Exception, vpersona.verify, bogusobj) # and that a modified object otherobj = testobj.new_version(('customprop', 'value')) # fails to verify self.assertRaises(Exception, vpersona.verify, otherobj) # that a persona object can be written perpath = os.path.join(self.basetempdir, 'persona.pasn1') persona.store(perpath) # and that when loaded back loadpersona = Persona.load(perpath) # the new persona object can sign an object nvtestobj = loadpersona.sign(testobj.new_version()) # and the old persona can verify it. self.assertTrue(vpersona.verify(nvtestobj)) def test_objectstore(self): objst = ObjectStore.load(os.path.join('fixtures', 'sample.data.pasn1')) objst.loadobj({ 'type': 'metadata', 'uuid': uuid.UUID('c9a1d1e2-3109-4efd-8948-577dc15e44e7'), 'modified': datetime.datetime(2019, 5, 31, 14, 3, 10), 'created_by_ref': self.created_by_ref, 'hashes': [ 'sha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada' ], 'lang': 'en', }) lst = objst.by_hash('91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada') self.assertEqual(len(lst), 2) byid = objst.by_id('3e466e06-45de-4ecc-84ba-2d2a3d970e96') self.assertIsInstance(byid, MetaData) self.assertIn(byid, lst) r = byid self.assertEqual(r.uuid, uuid.UUID('3e466e06-45de-4ecc-84ba-2d2a3d970e96')) self.assertEqual(r['dc:creator'], [ u'John-Mark Gurney' ]) fname = 'testfile.pasn1' objst.store(fname) with open(fname) as fp: objs = _asn1coder.loads(fp.read()) os.unlink(fname) self.assertEqual(len(objs), len(objst)) self.assertEqual(objs['created_by_ref'], self.created_by_ref.bytes) for i in objs['objects']: i['created_by_ref'] = uuid.UUID(bytes=i['created_by_ref']) i['uuid'] = uuid.UUID(bytes=i['uuid']) self.assertEqual(objst.by_id(i['uuid']), i) testfname = os.path.join(self.tempdir, 'test.txt') self.assertEqual(objst.by_file(testfname), [ byid ]) self.assertEqual(objst.by_file(testfname), [ byid ]) self.assertRaises(KeyError, objst.by_file, '/dev/null') # XXX make sure that object store contains fileobject # Tests to add: # Non-duplicates when same metadata is located by multiple hashes. def test_main(self): # Test the main runner, this is only testing things that are # specific to running the program, like where the store is # created. # setup object store storefname = os.path.join(self.tempdir, 'storefname') identfname = os.path.join(self.tempdir, 'identfname') shutil.copy(os.path.join('fixtures', 'sample.data.pasn1'), storefname) # setup path mapping def expandusermock(arg): if arg == '~/.medashare_store.pasn1': return storefname elif arg == '~/.medashare_identity.pasn1': return identfname # setup test fname testfname = os.path.join(self.tempdir, 'test.txt') import sys import StringIO import itertools with mock.patch('os.path.expanduser', side_effect=expandusermock) \ as eu: # that generating a new identity with nested(mock.patch('sys.stdout', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-g', '-a', 'name=A Test User' ])) as (stdout, argv): main() # does not output anything self.assertEqual(stdout.getvalue(), '') # looks up the correct file eu.assert_called_with('~/.medashare_identity.pasn1') # and that the identity persona = Persona.load(identfname) pident = persona.get_identity() # has the correct name self.assertEqual(pident.name, 'A Test User') # that when generating an identity when one already exists with nested(mock.patch('sys.stderr', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-g', '-a', 'name=A Test User' ])) as (stderr, argv): # that it exits with self.assertRaises(SystemExit) as cm: main() # with error code 1 self.assertEqual(cm.exception[0], 1) # and outputs an error message self.assertEqual(stderr.getvalue(), 'Error: Identity already created.\n') # and looked up the correct file eu.assert_called_with('~/.medashare_identity.pasn1') # that when updating the identity with nested(mock.patch('sys.stdout', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-i', '-a', 'name=Changed Name' ])) as (stdout, argv): main() # it doesn't output anything self.assertEqual(stdout.getvalue(), '') # and looked up the correct file eu.assert_called_with('~/.medashare_identity.pasn1') npersona = Persona.load(identfname) nident = npersona.get_identity() # and has the new name self.assertEqual(nident.name, 'Changed Name') # and has the same old uuid self.assertEqual(nident.uuid, pident.uuid) # and that the modified date has changed self.assertNotEqual(pident.modified, nident.modified) # and that the old Persona can verify the new one self.assertTrue(persona.verify(nident)) # that when asked to print the public key with nested(mock.patch('sys.stdout', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-p' ])) as (stdout, argv): main() # the correct key is printed self.assertEqual(stdout.getvalue(), '%s\n' % persona.get_pubkey()) # and looked up the correct file eu.assert_called_with('~/.medashare_identity.pasn1') with nested(mock.patch('sys.stdout', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-l', testfname ])) as (stdout, argv): main() self.assertEqual(stdout.getvalue(), 'dc:creator:\tJohn-Mark Gurney\nhashes:\tsha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada\nhashes:\tsha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f\nlang:\ten\n') eu.assert_called_with('~/.medashare_store.pasn1') with nested(mock.patch('sys.stdout', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-a', 'dc:creator=Another user', '-a', 'foo=bar=baz', testfname ])) as (stdout, argv): main() with nested(mock.patch('sys.stdout', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-l', testfname ])) as (stdout, argv): main() self.assertEqual(stdout.getvalue(), 'dc:creator:\tAnother user\ndc:creator:\tJohn-Mark Gurney\nfoo:\tbar=baz\nhashes:\tsha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada\nhashes:\tsha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f\nlang:\ten\n') with nested(mock.patch('sys.stdout', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-d', 'dc:creator', testfname ])) as (stdout, argv): main() with nested(mock.patch('sys.stdout', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-l', testfname ])) as (stdout, argv): main() self.assertEqual(stdout.getvalue(), 'foo:\tbar=baz\nhashes:\tsha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada\nhashes:\tsha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f\nlang:\ten\n') with nested(mock.patch('sys.stdout', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-a', 'foo=bleh', testfname ])) as (stdout, argv): main() with nested(mock.patch('sys.stdout', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-l', testfname ])) as (stdout, argv): main() self.assertEqual(stdout.getvalue(), 'foo:\tbar=baz\nfoo:\tbleh\nhashes:\tsha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada\nhashes:\tsha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f\nlang:\ten\n') with nested(mock.patch('sys.stdout', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-d', 'foo=bar=baz', testfname ])) as (stdout, argv): main() with nested(mock.patch('sys.stdout', StringIO.StringIO()), mock.patch('sys.argv', [ 'progname', '-l', testfname ])) as (stdout, argv): main() self.assertEqual(stdout.getvalue(), 'foo:\tbleh\nhashes:\tsha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada\nhashes:\tsha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f\nlang:\ten\n')