@@ -2,6 +2,11 @@
#import pdb, sys; mypdb = pdb.Pdb(stdout=sys.stderr); mypdb.set_trace()
from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey, \
Ed448PublicKey
from cryptography.hazmat.primitives.serialization import Encoding, \
PrivateFormat, PublicFormat, NoEncryption
import copy
import datetime
import hashlib
@@ -35,21 +40,48 @@ def _iterdictlist(obj):
else:
yield k, v
def _makeuuid(s):
if isinstance(s, uuid.UUID):
return s
return uuid.UUID(s)
# XXX - add validation
# XXX - how to add singletons
class MDBase(object):
'''This is a simple wrapper that turns a JSON object into a pythonesc
object where attribute accesses work.'''
_type = 'invalid'
_generated_properties = {
'uuid': uuid.uuid4,
'modified': datetime.datetime.utcnow
}
# When decoding, the decoded value should be passed to this function
# to get the correct type
_instance_properties = {
'uuid': _makeuuid,
'created_by_ref': _makeuuid,
}
_common_properties = [ 'type', 'created_by_ref' ] # XXX - add lang?
_common_optional = [ 'overlay_refs' ]
_common_optional = [ 'overlay_refs', 'sig' ]
_common_names = set(_common_properties + _generated_properties.keys())
def __init__(self, obj={}):
def __init__(self, obj={}, **kwargs ):
obj = copy.deepcopy(obj)
obj.update(kwargs)
if self._type == MDBase._type:
raise ValueError('call MDBase.create_obj instead so correct class is used.')
if 'type' in obj and obj['type'] != self._type:
raise ValueError(
'trying to create the wrong type of object, got: %s, expected: %s' %
(`obj['type']`, `self._type`))
if 'type' not in obj:
obj['type'] = self._type
@@ -57,6 +89,10 @@ class MDBase(object):
if x not in obj:
raise ValueError('common property %s not present' % `x`)
for x, fun in self._instance_properties.iteritems():
if x in obj:
obj[x] = fun(obj[x])
for x, fun in self._generated_properties.iteritems():
if x not in obj:
obj[x] = fun()
@@ -84,7 +120,7 @@ class MDBase(object):
def new_version(self, *args):
'''For each k, v pari, add the property k as an additional one
(or new on if first), with the value v.'''
(or new one if first), with the value v.'''
obj = copy.deepcopy(self._obj)
@@ -95,9 +131,18 @@ class MDBase(object):
return self.create_obj(obj)
def __repr__(self): # pragma: no cover
return '%s(%s)' % (self.__class__.__name__, `self._obj`)
def __getattr__(self, k):
return self._obj[k]
def __setattr__(self, k, v):
if k[0] == '_': # direct attribute
self.__dict__[k] = v
else:
self._obj[k] = v
def __getitem__(self, k):
return self._obj[k]
@@ -111,8 +156,15 @@ class MDBase(object):
return k in self._obj
def items(self, skipcommon=True):
return [ (k, v) for k, v in self._obj.items() if k not in
self._common_names ]
return [ (k, v) for k, v in self._obj.iteritems() if
not skipcommon or k not in self._common_names ]
def encode(self):
return _asn1coder.dumps(self)
@classmethod
def decode(cls, s):
return cls.create_obj(_asn1coder.loads(s))
class MetaData(MDBase):
_type = 'metadata'
@@ -121,8 +173,10 @@ class Identity(MDBase):
_type = 'identity'
# Identites don't need a created by
_common_properties = [ x for x in MDBase._common_properties if x != 'created_by_ref' ]
_common_optional = [ 'name', 'pubkey' ]
_common_properties = [ x for x in MDBase._common_properties if x !=
'created_by_ref' ]
_common_optional = [ x for x in MDBase._common_optional if x !=
'overlay_refs' ] + [ 'name', 'pubkey' ]
def _trytodict(o):
if isinstance(o, uuid.UUID):
@@ -139,39 +193,102 @@ class Persona(object):
create the proper identity object, serialize for saving keys,
create objects for that persona and other management.'''
def __init__(self, identity=None):
def __init__(self, identity=None, key=None ):
if identity is None:
self._identity = Identity()
else:
self._identity = identity
self._key = key
self._pubkey = None
if 'pubkey' in self._identity:
pubkeybytes = self._identity.pubkey[0]
self._pubkey = Ed448PublicKey.from_public_bytes(pubkeybytes)
self._created_by_ref = self._identity.uuid
def get_identity(self):
'''Return the Identity object for this Persona.'''
return self._identity
def store(self, fname):
'''Store the Persona to a file. If there is a private
key associated w/ the Persona, it will be saved as well.'''
with open(fname, 'w') as fp:
obj = {
'identity': self._identity,
}
if self._key is not None:
obj['key'] = \
self._key.private_bytes(Encoding.Raw,
PrivateFormat.Raw, NoEncryption())
fp.write(_asn1coder.dumps(obj))
@classmethod
def load(cls, fname):
'''Load the Persona from the provided file.'''
with open(fname) as fp:
objs = _asn1coder.loads(fp.read())
return cls(Identity(objs['identity']))
kwargs = {}
if 'key' in objs:
kwargs['key'] = Ed448PrivateKey.from_private_bytes(objs['key'])
return cls(Identity(objs['identity']), **kwargs)
def generate_key(self):
'''Generate a key for this Identity.
Raises a RuntimeError if a key is already present.'''
if self._key:
raise RuntimeError('a key already exists')
self._key = Ed448PrivateKey.generate()
self._pubkey = self._key.public_key()
pubkey = self._pubkey.public_bytes(Encoding.Raw,
PublicFormat.Raw)
self._identity = self._identity.new_version(('pubkey',
pubkey))
def _makesigbytes(self, obj):
obj = dict(obj.items(False))
try:
del obj['sig']
except KeyError:
pass
return _asn1coder.dumps(obj)
def sign(self, obj):
'''Takes the object, adds a signature, and returns the new
object.'''
sigbytes = self._makesigbytes(obj)
sig = self._key.sign(sigbytes)
newobj = MDBase.create_obj(obj)
newobj.sig = sig
return newobj
def verify(self, obj):
sigbytes = self._makesigbytes(obj)
self._pubkey.verify(obj['sig'], sigbytes)
return True
def by_file(self, fname):
'''Return a metadata object for the file named fname.'''
fid = FileObject.make_id(fname)
fobj = FileObject.from_file(fname, self._created_by_ref)
return fobj
return self.sign( fobj)
class ObjectStore(object):
'''A container to store for the various Metadata objects.'''
@@ -229,12 +346,7 @@ class ObjectStore(object):
obj = MDBase.create_obj(obj)
if not isinstance(obj.uuid, uuid.UUID):
id = uuid.UUID(obj.uuid)
else:
id = obj.uuid
self._uuids[id] = obj
self._uuids[obj.uuid] = obj
for j in obj.hashes:
h = self.makehash(j)
self._hashes.setdefault(h, []).append(obj)
@@ -378,7 +490,7 @@ def main():
obj[key].remove(v)
except ValueError:
del obj[k]
nobj = MDBase(obj)
nobj = MDBase.create_obj (obj)
objstr.loadobj(nobj)
else: # pragma: no cover
raise NotImplementedError
@@ -405,6 +517,7 @@ class _TestCases(unittest.TestCase):
self.tempdir = None
def test_mdbase(self):
self.assertRaises(ValueError, MDBase, created_by_ref='')
self.assertRaises(ValueError, MDBase.create_obj, { 'type': 'unknosldkfj' })
self.assertRaises(ValueError, MDBase.create_obj, { 'type': 'metadata' })
@@ -439,6 +552,29 @@ class _TestCases(unittest.TestCase):
# and that the modification is present
self.assertEqual(md2['dc:creator'], [ 'Jim Bob' ])
def test_mdbase_encode_decode(self):
# that an object
baseobj = {
'type': 'metadata',
'created_by_ref': self.created_by_ref,
}
obj = MDBase.create_obj(baseobj)
# can be encoded
coded = obj.encode()
# and that the rsults can be decoded
decobj = MDBase.decode(coded)
# and that they are equal
self.assertEqual(obj, decobj)
def test_mdbase_wrong_type(self):
# that created_by_ref can be passed by kw
obj = MetaData(created_by_ref=self.created_by_ref)
self.assertRaises(ValueError, FileObject, dict(obj.items(False)))
def test_makehash(self):
self.assertRaises(ValueError, ObjectStore.makehash, 'slkj')
self.assertRaises(ValueError, ObjectStore.makehash, 'sha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ADA')
@@ -506,6 +642,15 @@ class _TestCases(unittest.TestCase):
# that is has the overlays property
self.assertEqual(odict['overlay_refs'], [ bid ])
# that it doesn't have a common property
self.assertNotIn('type', odict)
# that when skipcommon is False
odict = dict(oobj.items(False))
# that it does have a common property
self.assertIn('type', odict)
def test_persona(self):
# that a newly created persona
persona = Persona()
@@ -513,6 +658,16 @@ class _TestCases(unittest.TestCase):
# has an identity object
idobj = persona.get_identity()
# that a key can be generated
persona.generate_key()
# that a second time, it raises an exception
self.assertRaises(RuntimeError, persona.generate_key)
# that the pubkey property is present
idobj = persona.get_identity()
self.assertIn('pubkey', idobj)
# that a file object created by it
testfname = os.path.join(self.tempdir, 'test.txt')
testobj = persona.by_file(testfname)
@@ -520,6 +675,42 @@ class _TestCases(unittest.TestCase):
# has the correct created_by_ref
self.assertEqual(testobj.created_by_ref, idobj.uuid)
# and has a signature
self.assertIn('sig', testobj)
# that a persona created from the identity object
vpersona = Persona(idobj)
# can verify the sig
self.assertTrue(vpersona.verify(testobj))
# and that a bogus signature
bogussig = 'somebogussig'
bogusobj = MDBase.create_obj(testobj)
bogusobj.sig = bogussig
# fails to verify
self.assertRaises(Exception, vpersona.verify, bogusobj)
# and that a modified object
otherobj = testobj.new_version(('customprop', 'value'))
# fails to verify
self.assertRaises(Exception, vpersona.verify, otherobj)
# that a persona object can be written
perpath = os.path.join(self.basetempdir, 'persona.pasn1')
persona.store(perpath)
# and that when loaded back
loadpersona = Persona.load(perpath)
# the new persona object can sign an object
nvtestobj = loadpersona.sign(testobj.new_version())
# and the old persona can verify it.
self.assertTrue(vpersona.verify(nvtestobj))
def test_objectstore(self):
objst = ObjectStore.load(os.path.join('fixtures', 'sample.data.pasn1'))
@@ -542,7 +733,7 @@ class _TestCases(unittest.TestCase):
r = byid
self.assertEqual(r.uuid, '3e466e06-45de-4ecc-84ba-2d2a3d970e96')
self.assertEqual(r.uuid, uuid.UUID( '3e466e06-45de-4ecc-84ba-2d2a3d970e96'))
self.assertEqual(r['dc:creator'], [ u'John-Mark Gurney' ])
fname = 'testfile.pasn1'
@@ -555,9 +746,11 @@ class _TestCases(unittest.TestCase):
self.assertEqual(len(objs), len(objst))
self.assertEqual(objs['created_by_ref'], self.created_by_ref)
self.assertEqual(objs['created_by_ref'], str(s elf.created_by_ref))
for i in objs['objects']:
i['created_by_ref'] = uuid.UUID(i['created_by_ref'])
i['uuid'] = uuid.UUID(i['uuid'])
self.assertEqual(objst.by_id(i['uuid']), i)
testfname = os.path.join(self.tempdir, 'test.txt')