|
- #!/usr/bin/env python
-
- #import pdb, sys; mypdb = pdb.Pdb(stdout=sys.stderr); mypdb.set_trace()
-
- from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey, \
- Ed448PublicKey
- from cryptography.hazmat.primitives.serialization import Encoding, \
- PrivateFormat, PublicFormat, NoEncryption
-
- import base58
- import copy
- import datetime
- import functools
- import hashlib
- import mock
- import os.path
- import pasn1
- import shutil
- import string
- import tempfile
- import unittest
- import uuid
-
- from contextlib import nested
-
- # The UUID for the namespace representing the path to a file
- _NAMESPACE_MEDASHARE_PATH = uuid.UUID('f6f36b62-3770-4a68-bc3d-dc3e31e429e6')
-
- _defaulthash = 'sha512'
- _validhashes = set([ 'sha256', 'sha512' ])
- _hashlengths = { len(getattr(hashlib, x)().hexdigest()): x for x in _validhashes }
-
- def _iterdictlist(obj):
- itms = obj.items()
- itms.sort()
- for k, v in itms:
- if isinstance(v, list):
- v = v[:]
- v.sort()
- for i in v:
- yield k, i
- else:
- yield k, v
-
- def _makeuuid(s):
- if isinstance(s, uuid.UUID):
- return s
-
- return uuid.UUID(bytes=s)
-
- # XXX - known issue, store is not atomic/safe, overwrites in place instead of renames
-
- # XXX - add validation
- # XXX - how to add singletons
- class MDBase(object):
- '''This is a simple wrapper that turns a JSON object into a pythonesc
- object where attribute accesses work.'''
-
- _type = 'invalid'
-
- _generated_properties = {
- 'uuid': uuid.uuid4,
- 'modified': datetime.datetime.utcnow
- }
-
- # When decoding, the decoded value should be passed to this function
- # to get the correct type
- _instance_properties = {
- 'uuid': _makeuuid,
- 'created_by_ref': _makeuuid,
- }
-
- _common_properties = [ 'type', 'created_by_ref' ] # XXX - add lang?
- _common_optional = set(('parent_refs', 'sig'))
- _common_names = set(_common_properties + _generated_properties.keys())
-
- def __init__(self, obj={}, **kwargs):
- obj = copy.deepcopy(obj)
- obj.update(kwargs)
-
- if self._type == MDBase._type:
- raise ValueError('call MDBase.create_obj instead so correct class is used.')
-
- if 'type' in obj and obj['type'] != self._type:
- raise ValueError(
- 'trying to create the wrong type of object, got: %s, expected: %s' %
- (`obj['type']`, `self._type`))
-
- if 'type' not in obj:
- obj['type'] = self._type
-
- for x in self._common_properties:
- if x not in obj:
- raise ValueError('common property %s not present' % `x`)
-
- for x, fun in self._instance_properties.iteritems():
- if x in obj:
- obj[x] = fun(obj[x])
-
- for x, fun in self._generated_properties.iteritems():
- if x not in obj:
- obj[x] = fun()
-
- self._obj = obj
-
- @classmethod
- def create_obj(cls, obj):
- '''Using obj as a base, create an instance of MDBase of the
- correct type.
-
- If the correct type is not found, a ValueError is raised.'''
-
- if isinstance(obj, cls):
- obj = obj._obj
-
- ty = obj['type']
-
- for i in MDBase.__subclasses__():
- if i._type == ty:
- return i(obj)
- else:
- raise ValueError('Unable to find class for type %s' %
- `ty`)
-
- def new_version(self, *args):
- '''For each k, v pari, add the property k as an additional one
- (or new one if first), with the value v.'''
-
- obj = copy.deepcopy(self._obj)
-
- common = self._common_names | self._common_optional
- for k, v in args:
- if k in common:
- obj[k] = v
- else:
- obj.setdefault(k, []).append(v)
-
- del obj['modified']
-
- return self.create_obj(obj)
-
- def __repr__(self): # pragma: no cover
- return '%s(%s)' % (self.__class__.__name__, `self._obj`)
-
- def __getattr__(self, k):
- try:
- return self._obj[k]
- except KeyError:
- raise AttributeError(k)
-
- def __setattr__(self, k, v):
- if k[0] == '_': # direct attribute
- self.__dict__[k] = v
- else:
- self._obj[k] = v
-
- def __getitem__(self, k):
- return self._obj[k]
-
- def __to_dict__(self):
- return self._obj
-
- def __eq__(self, o):
- return cmp(self._obj, o) == 0
-
- def __contains__(self, k):
- return k in self._obj
-
- def items(self, skipcommon=True):
- return [ (k, v) for k, v in self._obj.iteritems() if
- not skipcommon or k not in self._common_names ]
-
- def encode(self):
- return _asn1coder.dumps(self)
-
- @classmethod
- def decode(cls, s):
- return cls.create_obj(_asn1coder.loads(s))
-
- class MetaData(MDBase):
- _type = 'metadata'
-
- class Identity(MDBase):
- _type = 'identity'
-
- # Identites don't need a created by
- _common_properties = [ x for x in MDBase._common_properties if x !=
- 'created_by_ref' ]
- _common_optional = set([ x for x in MDBase._common_optional if x !=
- 'parent_refs' ] + [ 'name', 'pubkey' ])
- _common_names = set(_common_properties + MDBase._generated_properties.keys())
-
- def _trytodict(o):
- if isinstance(o, uuid.UUID):
- return 'bytes', o.bytes
- try:
- return 'dict', o.__to_dict__()
- except Exception: # pragma: no cover
- raise TypeError('unable to find __to_dict__ on %s: %s' % (type(o), `o`))
-
- class CanonicalCoder(pasn1.ASN1DictCoder):
- def enc_dict(self, obj, **kwargs):
- class FakeIter:
- def iteritems(self):
- itms = obj.items()
- itms.sort()
-
- return iter(itms)
-
- return pasn1.ASN1DictCoder.enc_dict(self, FakeIter(), **kwargs)
-
- _asn1coder = CanonicalCoder(coerce=_trytodict)
-
- class Persona(object):
- '''The object that represents a persona, or identity. It will
- create the proper identity object, serialize for saving keys,
- create objects for that persona and other management.'''
-
- def __init__(self, identity=None, key=None):
- if identity is None:
- self._identity = Identity()
- else:
- self._identity = identity
-
- self._key = key
- self._pubkey = None
-
- if 'pubkey' in self._identity:
- pubkeybytes = self._identity.pubkey
- self._pubkey = Ed448PublicKey.from_public_bytes(pubkeybytes)
-
- self._created_by_ref = self._identity.uuid
-
- def MetaData(self, *args, **kwargs):
- kwargs['created_by_ref'] = self.uuid
-
- return self.sign(MetaData(*args, **kwargs))
-
- @property
- def uuid(self):
- '''Return the UUID of the identity represented.'''
-
- return self._identity.uuid
-
- def __repr__(self): # pragma: no cover
- r = '<Persona: has key: %s, has pubkey: %s, identity: %s>' % \
- (self._key is not None, self._pubkey is not None,
- `self._identity`)
-
- return r
-
- @classmethod
- def from_pubkey(cls, pubkeystr):
- pubstr = base58.b58decode_check(pubkeystr)
-
- uuid, pubkey = _asn1coder.loads(pubstr)
-
- ident = Identity(uuid=uuid, pubkey=pubkey)
-
- return cls(ident)
-
- def get_identity(self):
- '''Return the Identity object for this Persona.'''
-
- return self._identity
-
- def get_pubkey(self):
- '''Get a printable version of the public key. This is used
- for importing into different programs, or for shared.'''
-
- idobj = self._identity
- pubstr = _asn1coder.dumps([ idobj.uuid, idobj.pubkey ])
-
- return base58.b58encode_check(pubstr)
-
- def new_version(self, *args):
- '''Update the Persona's Identity object.'''
-
- self._identity = self.sign(self._identity.new_version(*args))
-
- return self._identity
-
- def store(self, fname):
- '''Store the Persona to a file. If there is a private
- key associated w/ the Persona, it will be saved as well.'''
-
- with open(fname, 'w') as fp:
- obj = {
- 'identity': self._identity,
- }
- if self._key is not None:
- obj['key'] = \
- self._key.private_bytes(Encoding.Raw,
- PrivateFormat.Raw, NoEncryption())
-
- fp.write(_asn1coder.dumps(obj))
-
- @classmethod
- def load(cls, fname):
- '''Load the Persona from the provided file.'''
-
- with open(fname) as fp:
- objs = _asn1coder.loads(fp.read())
-
- kwargs = {}
- if 'key' in objs:
- kwargs['key'] = Ed448PrivateKey.from_private_bytes(objs['key'])
-
- return cls(Identity(objs['identity']), **kwargs)
-
- def generate_key(self):
- '''Generate a key for this Identity.
-
- Raises a RuntimeError if a key is already present.'''
-
- if self._key:
- raise RuntimeError('a key already exists')
-
- self._key = Ed448PrivateKey.generate()
- self._pubkey = self._key.public_key()
- pubkey = self._pubkey.public_bytes(Encoding.Raw,
- PublicFormat.Raw)
- self._identity = self.sign(self._identity.new_version(('pubkey',
- pubkey)))
-
- def _makesigbytes(self, obj):
- obj = dict(obj.items(False))
- try:
- del obj['sig']
- except KeyError:
- pass
-
- return _asn1coder.dumps(obj)
-
- def sign(self, obj):
- '''Takes the object, adds a signature, and returns the new
- object.'''
-
- sigbytes = self._makesigbytes(obj)
- sig = self._key.sign(sigbytes)
- newobj = MDBase.create_obj(obj)
- newobj.sig = sig
-
- return newobj
-
- def verify(self, obj):
- sigbytes = self._makesigbytes(obj)
-
- pubkey = self._pubkey.public_bytes(Encoding.Raw,
- PublicFormat.Raw)
- self._pubkey.verify(obj['sig'], sigbytes)
-
- return True
-
- def by_file(self, fname):
- '''Return a metadata object for the file named fname.'''
-
- fobj = FileObject.from_file(fname, self._created_by_ref)
-
- return self.sign(fobj)
-
- class ObjectStore(object):
- '''A container to store for the various Metadata objects.'''
-
- # The _uuids property contains both the UUIDv4 for objects, and
- # looking up the UUIDv5 for FileObjects.
-
- def __init__(self, created_by_ref):
- self._created_by_ref = created_by_ref
- self._uuids = {}
- self._hashes = {}
-
- @staticmethod
- def makehash(hashstr, strict=True):
- '''Take a hash or hash string, and return a valid hash
- string from it.
-
- This makes sure that it is of the correct type and length.
-
- If strict is False, the function will detect the length and
- return a valid hash string if one can be found.
-
- By default, the string must be prepended by the type,
- followed by a colon, followed by the value in hex in all
- lower case characters.'''
-
- try:
- hash, value = hashstr.split(':')
- except ValueError:
- if strict:
- raise
-
- hash = _hashlengths[len(hashstr)]
- value = hashstr
-
- if strict and len(str(value).translate(None, string.hexdigits.lower())) != 0:
- raise ValueError('value has invalid hex digits (must be lower case)', value)
-
- if hash in _validhashes:
- return ':'.join((hash, value))
-
- raise ValueError
-
- def __len__(self):
- return len(self._uuids)
-
- def store(self, fname):
- '''Write out the objects in the store to the file named
- fname.'''
-
- with open(fname, 'w') as fp:
- obj = {
- 'created_by_ref': self._created_by_ref,
- 'objects': self._uuids.values(),
- }
- fp.write(_asn1coder.dumps(obj))
-
- def loadobj(self, obj):
- '''Load obj into the data store.'''
-
- obj = MDBase.create_obj(obj)
-
- self._uuids[obj.uuid] = obj
- for j in obj.hashes:
- h = self.makehash(j)
- self._hashes.setdefault(h, []).append(obj)
-
- @classmethod
- def load(cls, fname):
- '''Load objects from the provided file name.
-
- Basic validation will be done on the objects in the file.
-
- The objects will be accessible via other methods.'''
-
- with open(fname) as fp:
- objs = _asn1coder.loads(fp.read())
-
- obj = cls(objs['created_by_ref'])
- for i in objs['objects']:
- obj.loadobj(i)
-
- return obj
-
- def by_id(self, id):
- '''Look up an object by it's UUID.'''
-
- if not isinstance(id, uuid.UUID):
- uid = uuid.UUID(id)
- else:
- uid = id
-
- return self._uuids[uid]
-
- def by_hash(self, hash):
- '''Look up an object by it's hash value.'''
-
- h = self.makehash(hash, strict=False)
- return self._hashes[h]
-
- def by_file(self, fname):
- '''Return a metadata object for the file named fname.'''
-
- fid = FileObject.make_id(fname)
- try:
- fobj = self.by_id(fid)
- except KeyError:
- # unable to find it
- fobj = FileObject.from_file(fname, self._created_by_ref)
- self.loadobj(fobj)
-
- for i in fobj.hashes:
- j = self.by_hash(i)
-
- # Filter out non-metadata objects
- j = [ x for x in j if x.type == 'metadata' ]
- if j:
- return j
- else:
- raise KeyError('unable to find metadata for file')
-
- def _hashfile(fname):
- hash = getattr(hashlib, _defaulthash)()
- with open(fname) as fp:
- r = fp.read()
- hash.update(r)
-
- return '%s:%s' % (_defaulthash, hash.hexdigest())
-
- class FileObject(MDBase):
- _type = 'file'
-
- @staticmethod
- def make_id(fname):
- '''Take a local file name, and make the id for it. Note that
- converts from the local path separator to a forward slash so
- that it will be the same between Windows and Unix systems.'''
-
- fname = os.path.realpath(fname)
- return uuid.uuid5(_NAMESPACE_MEDASHARE_PATH,
- '/'.join(os.path.split(fname)))
-
- @classmethod
- def from_file(cls, filename, created_by_ref):
- s = os.stat(filename)
- obj = {
- 'dir': os.path.dirname(filename),
- 'created_by_ref': created_by_ref,
- 'filename': os.path.basename(filename),
- 'id': cls.make_id(filename),
- 'mtime': datetime.datetime.utcfromtimestamp(s.st_mtime),
- 'size': s.st_size,
- 'hashes': [ _hashfile(filename), ],
- }
-
-
- return cls(obj)
-
- def enumeratedir(_dir, created_by_ref):
- '''Enumerate all the files and directories (not recursive) in _dir.
-
- Returned is a list of FileObjects.'''
-
- return map(lambda x: FileObject.from_file(os.path.join(_dir, x), created_by_ref),
- os.listdir(_dir))
-
- def main():
- from optparse import OptionParser
- import sys
-
- parser = OptionParser()
- parser.add_option('-a', action='append', dest='add',
- default=[], help='add the arg as metadata for files, tag=value')
- parser.add_option('-d', action='append', dest='delete',
- default=[], help='delete the arg as metadata from files. Either specify tag, and all tags are removed, or specify tag=value and that specific tag will be removed.')
- parser.add_option('-g', action='store_true', dest='generateident',
- default=False, help='generate an identity')
- parser.add_option('-i', action='store_true', dest='updateident',
- default=False, help='update the identity')
- parser.add_option('-l', action='store_true', dest='list',
- default=False, help='list metadata')
- parser.add_option('-p', action='store_true', dest='printpub',
- default=False, help='Print the public key of the identity')
-
- options, args = parser.parse_args()
-
- # this is shared between generateident and add
- addprops = map(lambda x: x.split('=', 1), options.add)
-
- if options.generateident or options.updateident or options.printpub:
- identfname = os.path.expanduser('~/.medashare_identity.pasn1')
-
- if options.generateident and os.path.exists(identfname):
- print >>sys.stderr, 'Error: Identity already created.'
- sys.exit(1)
-
- if options.generateident:
- persona = Persona()
- persona.generate_key()
- else:
- persona = Persona.load(identfname)
-
- if options.printpub:
- print persona.get_pubkey()
- return
-
- persona.new_version(*addprops)
- persona.store(identfname)
- return
-
- storefname = os.path.expanduser('~/.medashare_store.pasn1')
- import sys
- #print >>sys.stderr, `storefname`
- objstr = ObjectStore.load(storefname)
-
- if options.list:
- for i in args:
- for j in objstr.by_file(i):
- #print >>sys.stderr, `j._obj`
- for k, v in _iterdictlist(j):
- print '%s:\t%s' % (k, v)
- elif options.add:
- for i in args:
- for j in objstr.by_file(i):
- nobj = j.new_version(*addprops)
- objstr.loadobj(nobj)
- elif options.delete:
- for i in args:
- for j in objstr.by_file(i):
- obj = j.__to_dict__()
- for k in options.delete:
- try:
- key, v = k.split('=', 1)
- obj[key].remove(v)
- except ValueError:
- del obj[k]
- nobj = MDBase.create_obj(obj)
- objstr.loadobj(nobj)
- else: # pragma: no cover
- raise NotImplementedError
-
- objstr.store(storefname)
-
- if __name__ == '__main__': # pragma: no cover
- main()
-
- class _TestCononicalCoder(unittest.TestCase):
- def test_con(self):
- obja = { 'foo': 23984732, 'a': 5, 'b': 6, 'something': '2398472398723498273dfasdfjlaksdfj' }
- objaitems = obja.items()
- objaitems.sort()
- objb = dict(objaitems)
-
- self.assertEqual(obja, objb)
-
- # This is to make sure that item order changed
- self.assertNotEqual(obja.items(), objb.items())
-
- astr = pasn1.dumps(obja)
- bstr = pasn1.dumps(objb)
-
- self.assertNotEqual(astr, bstr)
-
- astr = _asn1coder.dumps(obja)
- bstr = _asn1coder.dumps(objb)
-
- self.assertEqual(astr, bstr)
-
- class _TestCases(unittest.TestCase):
- def setUp(self):
- d = os.path.realpath(tempfile.mkdtemp())
- self.basetempdir = d
- self.tempdir = os.path.join(d, 'subdir')
-
- persona = Persona.load(os.path.join('fixtures', 'sample.persona.pasn1'))
- self.created_by_ref = persona.get_identity().uuid
-
- shutil.copytree(os.path.join('fixtures', 'testfiles'),
- self.tempdir)
-
- def tearDown(self):
- shutil.rmtree(self.basetempdir)
- self.tempdir = None
-
- def test_mdbase(self):
- self.assertRaises(ValueError, MDBase, created_by_ref='')
- self.assertRaises(ValueError, MDBase.create_obj, { 'type': 'unknosldkfj' })
- self.assertRaises(ValueError, MDBase.create_obj, { 'type': 'metadata' })
-
- baseobj = {
- 'type': 'metadata',
- 'created_by_ref': self.created_by_ref,
- }
- origbase = copy.deepcopy(baseobj)
-
- # that when an MDBase object is created
- md = MDBase.create_obj(baseobj)
-
- # it doesn't modify the passed in object (when adding
- # generated properties)
- self.assertEqual(baseobj, origbase)
-
- # and it has the generted properties
- # Note: cannot mock the functions as they are already
- # referenced at creation time
- self.assertIn('uuid', md)
- self.assertIn('modified', md)
-
- # That you can create a new version using new_version
- md2 = md.new_version(('dc:creator', 'Jim Bob',))
-
- # that they are different
- self.assertNotEqual(md, md2)
-
- # and that the new modified time is different from the old
- self.assertNotEqual(md.modified, md2.modified)
-
- # and that the modification is present
- self.assertEqual(md2['dc:creator'], [ 'Jim Bob' ])
-
- # that providing a value from common property
- fvalue = 'fakesig'
- md3 = md.new_version(('sig', fvalue))
-
- # gets set directly, and is not a list
- self.assertEqual(md3.sig, fvalue)
-
- # that invalid attribute access raises correct exception
- self.assertRaises(AttributeError, getattr, md, 'somerandombogusattribute')
-
- def test_mdbase_encode_decode(self):
- # that an object
- baseobj = {
- 'type': 'metadata',
- 'created_by_ref': self.created_by_ref,
- }
- obj = MDBase.create_obj(baseobj)
-
- # can be encoded
- coded = obj.encode()
-
- # and that the rsults can be decoded
- decobj = MDBase.decode(coded)
-
- # and that they are equal
- self.assertEqual(obj, decobj)
-
- # and in the encoded object
- eobj = _asn1coder.loads(coded)
-
- # the uuid property is a str instance
- self.assertIsInstance(eobj['uuid'], str)
-
- # and has the length of 16
- self.assertEqual(len(eobj['uuid']), 16)
-
- def test_mdbase_wrong_type(self):
- # that created_by_ref can be passed by kw
- obj = MetaData(created_by_ref=self.created_by_ref)
-
- self.assertRaises(ValueError, FileObject, dict(obj.items(False)))
-
- def test_makehash(self):
- self.assertRaises(ValueError, ObjectStore.makehash, 'slkj')
- self.assertRaises(ValueError, ObjectStore.makehash, 'sha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ADA')
- self.assertRaises(ValueError, ObjectStore.makehash, 'bogushash:9e0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ADA', strict=False)
-
- self.assertEqual(ObjectStore.makehash('cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e', strict=False), 'sha512:cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e')
- self.assertEqual(ObjectStore.makehash('e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', strict=False), 'sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855')
-
- def test_enumeratedir(self):
- files = enumeratedir(self.tempdir, self.created_by_ref)
- ftest = files[0]
- fname = 'test.txt'
-
- # make sure that they are of type MDBase
- self.assertIsInstance(ftest, MDBase)
-
- oldid = ftest.id
- self.assertEqual(ftest.filename, fname)
- self.assertEqual(ftest.dir, self.tempdir)
- # XXX - do we add host information?
- self.assertEqual(ftest.id, uuid.uuid5(_NAMESPACE_MEDASHARE_PATH,
- '/'.join(os.path.split(self.tempdir) +
- ( fname, ))))
- self.assertEqual(ftest.mtime, datetime.datetime(2019, 5, 20, 21, 47, 36))
- self.assertEqual(ftest.size, 15)
- self.assertIn('sha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f', ftest.hashes)
-
- # XXX - make sure works w/ relative dirs
- files = enumeratedir(os.path.relpath(self.tempdir),
- self.created_by_ref)
- self.assertEqual(oldid, files[0].id)
-
- def test_mdbaseoverlay(self):
- objst = ObjectStore(self.created_by_ref)
-
- # that a base object
- bid = uuid.uuid4()
- objst.loadobj({
- 'type': 'metadata',
- 'uuid': bid,
- 'modified': datetime.datetime(2019, 6, 10, 14, 3, 10),
- 'created_by_ref': self.created_by_ref,
- 'hashes': [ 'sha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada' ],
- 'someprop': [ 'somevalue' ],
- 'lang': 'en',
- })
-
- # can have an overlay object
- oid = uuid.uuid4()
- dhash = 'sha256:a7c96262c21db9a06fd49e307d694fd95f624569f9b35bb3ffacd880440f9787'
- objst.loadobj({
- 'type': 'metadata',
- 'uuid': oid,
- 'modified': datetime.datetime(2019, 6, 10, 18, 3, 10),
- 'created_by_ref': self.created_by_ref,
- 'hashes': [ dhash ],
- 'parent_refs': [ bid ],
- 'lang': 'en',
- })
-
- # and that when you get it's properties
- oobj = objst.by_id(oid)
- odict = dict(oobj.items())
-
- # that is has the overlays property
- self.assertEqual(odict['parent_refs'], [ bid ])
-
- # that it doesn't have a common property
- self.assertNotIn('type', odict)
-
- # that when skipcommon is False
- odict = dict(oobj.items(False))
-
- # that it does have a common property
- self.assertIn('type', odict)
-
- def test_persona(self):
- # that a newly created persona
- persona = Persona()
-
- # has an identity object
- idobj = persona.get_identity()
-
- # and that it has a uuid attribute that matches
- self.assertEqual(persona.uuid, idobj['uuid'])
-
- # that a key can be generated
- persona.generate_key()
-
- # that the pubkey property is present
- idobj = persona.get_identity()
- self.assertIsInstance(idobj['pubkey'], str)
-
- # that get_pubkey returns the correct thing
- pubstr = _asn1coder.dumps([ idobj.uuid, idobj['pubkey'] ])
- self.assertEqual(persona.get_pubkey(),
- base58.b58encode_check(pubstr))
-
- # and that there is a signature
- self.assertIsInstance(idobj['sig'], str)
-
- # and that it can verify itself
- persona.verify(idobj)
-
- # and that a new persona can be created from the pubkey
- pkpersona = Persona.from_pubkey(persona.get_pubkey())
-
- # and that it can verify the old identity
- self.assertTrue(pkpersona.verify(idobj))
-
- # that a second time, it raises an exception
- self.assertRaises(RuntimeError, persona.generate_key)
-
- # that a file object created by it
- testfname = os.path.join(self.tempdir, 'test.txt')
- testobj = persona.by_file(testfname)
-
- # has the correct created_by_ref
- self.assertEqual(testobj.created_by_ref, idobj.uuid)
-
- # and has a signature
- self.assertIn('sig', testobj)
-
- # that a persona created from the identity object
- vpersona = Persona(idobj)
-
- # can verify the sig
- self.assertTrue(vpersona.verify(testobj))
-
- # and that a bogus signature
- bogussig = 'somebogussig'
- bogusobj = MDBase.create_obj(testobj)
- bogusobj.sig = bogussig
-
- # fails to verify
- self.assertRaises(Exception, vpersona.verify, bogusobj)
-
- # and that a modified object
- otherobj = testobj.new_version(('customprop', 'value'))
-
- # fails to verify
- self.assertRaises(Exception, vpersona.verify, otherobj)
-
- # that a persona object can be written
- perpath = os.path.join(self.basetempdir, 'persona.pasn1')
- persona.store(perpath)
-
- # and that when loaded back
- loadpersona = Persona.load(perpath)
-
- # the new persona object can sign an object
- nvtestobj = loadpersona.sign(testobj.new_version())
-
- # and the old persona can verify it.
- self.assertTrue(vpersona.verify(nvtestobj))
-
- def test_persona_metadata(self):
- # that a persona
- persona = Persona()
- persona.generate_key()
-
- # can create a metadata object
- hashobj = ['asdlfkj']
- mdobj = persona.MetaData(hashes=hashobj)
-
- # that the object has the correct created_by_ref
- self.assertEqual(mdobj.created_by_ref, persona.uuid)
-
- # and has the provided hashes
- self.assertEqual(mdobj.hashes, hashobj)
-
- # and that it can be verified
- persona.verify(mdobj)
-
- def test_objectstore(self):
- objst = ObjectStore.load(os.path.join('fixtures', 'sample.data.pasn1'))
-
- objst.loadobj({
- 'type': 'metadata',
- 'uuid': uuid.UUID('c9a1d1e2-3109-4efd-8948-577dc15e44e7'),
- 'modified': datetime.datetime(2019, 5, 31, 14, 3, 10),
- 'created_by_ref': self.created_by_ref,
- 'hashes': [ 'sha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada' ],
- 'lang': 'en',
- })
-
- lst = objst.by_hash('91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada')
- self.assertEqual(len(lst), 2)
-
- byid = objst.by_id('3e466e06-45de-4ecc-84ba-2d2a3d970e96')
-
- self.assertIsInstance(byid, MetaData)
- self.assertIn(byid, lst)
-
- r = byid
-
- self.assertEqual(r.uuid, uuid.UUID('3e466e06-45de-4ecc-84ba-2d2a3d970e96'))
- self.assertEqual(r['dc:creator'], [ u'John-Mark Gurney' ])
-
- fname = 'testfile.pasn1'
- objst.store(fname)
-
- with open(fname) as fp:
- objs = _asn1coder.loads(fp.read())
-
- os.unlink(fname)
-
- self.assertEqual(len(objs), len(objst))
-
- self.assertEqual(objs['created_by_ref'], self.created_by_ref.bytes)
-
- for i in objs['objects']:
- i['created_by_ref'] = uuid.UUID(bytes=i['created_by_ref'])
- i['uuid'] = uuid.UUID(bytes=i['uuid'])
- self.assertEqual(objst.by_id(i['uuid']), i)
-
- testfname = os.path.join(self.tempdir, 'test.txt')
- self.assertEqual(objst.by_file(testfname), [ byid ])
- self.assertEqual(objst.by_file(testfname), [ byid ])
-
- self.assertRaises(KeyError, objst.by_file, '/dev/null')
-
- # XXX make sure that object store contains fileobject
-
- # Tests to add:
- # Non-duplicates when same metadata is located by multiple hashes.
-
- def test_main(self):
- # Test the main runner, this is only testing things that are
- # specific to running the program, like where the store is
- # created.
-
- # setup object store
- storefname = os.path.join(self.tempdir, 'storefname')
- identfname = os.path.join(self.tempdir, 'identfname')
- shutil.copy(os.path.join('fixtures', 'sample.data.pasn1'), storefname)
-
- # setup path mapping
- def expandusermock(arg):
- if arg == '~/.medashare_store.pasn1':
- return storefname
- elif arg == '~/.medashare_identity.pasn1':
- return identfname
-
- # setup test fname
- testfname = os.path.join(self.tempdir, 'test.txt')
-
- import sys
- import StringIO
- import itertools
-
- with mock.patch('os.path.expanduser', side_effect=expandusermock) \
- as eu:
- # that generating a new identity
- with nested(mock.patch('sys.stdout',
- StringIO.StringIO()), mock.patch('sys.argv',
- [ 'progname', '-g', '-a', 'name=A Test User' ])) as (stdout, argv):
- main()
-
- # does not output anything
- self.assertEqual(stdout.getvalue(), '')
-
- # looks up the correct file
- eu.assert_called_with('~/.medashare_identity.pasn1')
-
- # and that the identity
- persona = Persona.load(identfname)
- pident = persona.get_identity()
-
- # has the correct name
- self.assertEqual(pident.name, 'A Test User')
-
- # that when generating an identity when one already exists
- with nested(mock.patch('sys.stderr',
- StringIO.StringIO()), mock.patch('sys.argv',
- [ 'progname', '-g', '-a', 'name=A Test User' ])) as (stderr, argv):
- # that it exits
- with self.assertRaises(SystemExit) as cm:
- main()
-
- # with error code 1
- self.assertEqual(cm.exception[0], 1)
-
- # and outputs an error message
- self.assertEqual(stderr.getvalue(),
- 'Error: Identity already created.\n')
-
- # and looked up the correct file
- eu.assert_called_with('~/.medashare_identity.pasn1')
-
- # that when updating the identity
- with nested(mock.patch('sys.stdout',
- StringIO.StringIO()), mock.patch('sys.argv',
- [ 'progname', '-i', '-a', 'name=Changed Name' ])) as (stdout, argv):
- main()
-
- # it doesn't output anything
- self.assertEqual(stdout.getvalue(), '')
-
- # and looked up the correct file
- eu.assert_called_with('~/.medashare_identity.pasn1')
-
- npersona = Persona.load(identfname)
- nident = npersona.get_identity()
-
- # and has the new name
- self.assertEqual(nident.name, 'Changed Name')
-
- # and has the same old uuid
- self.assertEqual(nident.uuid, pident.uuid)
-
- # and that the modified date has changed
- self.assertNotEqual(pident.modified, nident.modified)
-
- # and that the old Persona can verify the new one
- self.assertTrue(persona.verify(nident))
-
- # that when asked to print the public key
- with nested(mock.patch('sys.stdout',
- StringIO.StringIO()), mock.patch('sys.argv',
- [ 'progname', '-p' ])) as (stdout, argv):
- main()
-
- # the correct key is printed
- self.assertEqual(stdout.getvalue(),
- '%s\n' % persona.get_pubkey())
-
- # and looked up the correct file
- eu.assert_called_with('~/.medashare_identity.pasn1')
-
- with nested(mock.patch('sys.stdout',
- StringIO.StringIO()), mock.patch('sys.argv',
- [ 'progname', '-l', testfname ])) as (stdout, argv):
- main()
- self.assertEqual(stdout.getvalue(),
- 'dc:creator:\tJohn-Mark Gurney\nhashes:\tsha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada\nhashes:\tsha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f\nlang:\ten\n')
- eu.assert_called_with('~/.medashare_store.pasn1')
-
- with nested(mock.patch('sys.stdout',
- StringIO.StringIO()), mock.patch('sys.argv',
- [ 'progname', '-a', 'dc:creator=Another user', '-a', 'foo=bar=baz', testfname ])) as (stdout, argv):
- main()
-
- with nested(mock.patch('sys.stdout',
- StringIO.StringIO()), mock.patch('sys.argv',
- [ 'progname', '-l', testfname ])) as (stdout, argv):
- main()
- self.assertEqual(stdout.getvalue(),
- 'dc:creator:\tAnother user\ndc:creator:\tJohn-Mark Gurney\nfoo:\tbar=baz\nhashes:\tsha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada\nhashes:\tsha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f\nlang:\ten\n')
-
- with nested(mock.patch('sys.stdout',
- StringIO.StringIO()), mock.patch('sys.argv',
- [ 'progname', '-d', 'dc:creator', testfname ])) as (stdout, argv):
- main()
-
- with nested(mock.patch('sys.stdout',
- StringIO.StringIO()), mock.patch('sys.argv',
- [ 'progname', '-l', testfname ])) as (stdout, argv):
- main()
- self.assertEqual(stdout.getvalue(),
- 'foo:\tbar=baz\nhashes:\tsha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada\nhashes:\tsha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f\nlang:\ten\n')
-
- with nested(mock.patch('sys.stdout',
- StringIO.StringIO()), mock.patch('sys.argv',
- [ 'progname', '-a', 'foo=bleh', testfname ])) as (stdout, argv):
- main()
-
- with nested(mock.patch('sys.stdout',
- StringIO.StringIO()), mock.patch('sys.argv',
- [ 'progname', '-l', testfname ])) as (stdout, argv):
- main()
- self.assertEqual(stdout.getvalue(),
- 'foo:\tbar=baz\nfoo:\tbleh\nhashes:\tsha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada\nhashes:\tsha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f\nlang:\ten\n')
-
- with nested(mock.patch('sys.stdout',
- StringIO.StringIO()), mock.patch('sys.argv',
- [ 'progname', '-d', 'foo=bar=baz', testfname ])) as (stdout, argv):
- main()
-
- with nested(mock.patch('sys.stdout',
- StringIO.StringIO()), mock.patch('sys.argv',
- [ 'progname', '-l', testfname ])) as (stdout, argv):
- main()
- self.assertEqual(stdout.getvalue(),
- 'foo:\tbleh\nhashes:\tsha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada\nhashes:\tsha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f\nlang:\ten\n')
|