|
- #!/usr/bin/env python
-
- import hashlib
- import pasn1
- import os.path
- import shutil
- import string
- import tempfile
- import unittest
- import uuid
-
- _validhashes = set([ 'sha256', 'sha512' ])
- _hashlengths = { len(getattr(hashlib, x)().hexdigest()): x for x in _validhashes }
-
- # XXX - add validation
- class ObjWrap(object):
- '''This is a simple wrapper that turns a JSON object into a pythonesc
- object where attribute accesses work.'''
-
- def __init__(self, obj):
- self._obj = obj
-
- def __getattr__(self, k):
- return self._obj[k]
-
- def __getitem__(self, k):
- return self._obj[k]
-
- def __to_dict__(self):
- return self._obj
-
- def __eq__(self, o):
- return cmp(self._obj, o) == 0
-
- def _trytodict(o):
- try:
- return 'dict', o.__to_dict__()
- except Exception:
- raise TypeError('unable to find __to_dict__ on %s' % type(o))
-
- _asn1coder = pasn1.ASN1DictCoder(coerce=_trytodict)
-
- class ObjectStore(object):
- '''A container to store for the various Metadata objects.'''
-
- def __init__(self):
- self._uuids = {}
- self._hashes = {}
-
- @staticmethod
- def makehash(hashstr, strict=True):
- '''Take a hash string, and return a valid hash string from it.
-
- This makes sure that it is of the correct type and length.
-
- If strict is False, the function will detect the length and
- return a valid hash if one can be found.'''
-
- try:
- hash, value = hashstr.split(':')
- except ValueError:
- if strict:
- raise
-
- hash = _hashlengths[len(hashstr)]
- value = hashstr
-
- if strict and len(str(value).translate(None, string.hexdigits.lower())) != 0:
- raise ValueError('value has invalid hex digits (must be lower case)', value)
-
- if hash in _validhashes:
- return ':'.join((hash, value))
-
- raise ValueError
-
- def __len__(self):
- return len(self._uuids)
-
- def store(self, fname):
- '''Write out the objects in the store to the file named
- fname.'''
-
- with open(fname, 'w') as fp:
- fp.write(_asn1coder.dumps(self._uuids.values()))
-
- def loadobj(self, obj):
- '''Load obj into the data store.'''
-
- if not isinstance(obj, ObjWrap):
- obj = ObjWrap(obj)
-
- id = uuid.UUID(obj.uuid)
- self._uuids[id] = obj
- for j in obj.hashes:
- h = self.makehash(j)
- self._hashes.setdefault(h, []).append(obj)
-
- def load(self, fname):
- '''Load objects from the provided file name.
-
- Basic validation will be done on the objects in the file.
-
- The objects will be accessible via other methods.'''
-
- with open(fname) as fp:
- objs = _asn1coder.loads(fp.read())
-
- for i in objs:
- self.loadobj(i)
-
- def by_id(self, id):
- '''Look up an object by it's UUID.'''
-
- uid = uuid.UUID(id)
- return self._uuids[uid]
-
- def by_hash(self, hash):
- '''Look up an object by it's hash value.'''
-
- h = self.makehash(hash, strict=False)
- return self._hashes[h]
-
- class FileObject(object):
- def __init__(self, _dir, filename):
- self._dir = _dir
- self._fname = filename
-
- @property
- def filename(self):
- '''The name of the file.'''
-
- return self._fname
-
- @property
- def dir(self):
- '''The directory of the file.'''
-
- return self._dir
-
- @property
- def id(self):
- '''The UUID of the path to this file.'''
-
- # XXX make sure this is correct
- return uuid.uuid5(uuid.NAMESPACE_URL, 'someurl' + '/'.join(os.path.split(self._dir) + ( self._fname, )))
-
- def enumeratedir(_dir):
- '''Enumerate all the files and directories (not recursive) in _dir.
-
- Returned is a list of FileObjects.'''
-
- return map(lambda x: FileObject(_dir, x), os.listdir(_dir))
-
- class _TestCases(unittest.TestCase):
- def setUp(self):
- d = tempfile.mkdtemp()
- self.basetempdir = d
- self.tempdir = os.path.join(d, 'subdir')
-
- shutil.copytree(os.path.join('fixtures', 'testfiles'),
- self.tempdir)
-
- def tearDown(self):
- shutil.rmtree(self.basetempdir)
- self.tempdir = None
-
- def test_makehash(self):
- self.assertRaises(ValueError, ObjectStore.makehash, 'slkj')
- self.assertRaises(ValueError, ObjectStore.makehash, 'sha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ADA')
-
- self.assertEqual(ObjectStore.makehash('cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e', strict=False), 'sha512:cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e')
- self.assertEqual(ObjectStore.makehash('e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', strict=False), 'sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855')
-
- def test_enumeratedir(self):
- files = enumeratedir(self.tempdir)
- ftest = files[0]
- fname = 'test.txt'
-
- self.assertEqual(ftest.filename, fname)
- self.assertEqual(ftest.dir, self.tempdir)
- self.assertEqual(ftest.id, uuid.uuid5(uuid.NAMESPACE_URL,
- 'someurl' + '/'.join(os.path.split(self.tempdir) +
- ( fname, ))))
-
- def test_objectstore(self):
- objst = ObjectStore()
-
- objst.load(os.path.join('fixtures', 'sample.data.pasn1'))
-
- objst.loadobj({
- 'type': 'metadata',
- 'uuid': 'c9a1d1e2-3109-4efd-8948-577dc15e44e7',
- 'hashes': [ 'sha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada' ],
- 'lang': 'en',
- })
-
- lst = objst.by_hash('91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada')
- self.assertEqual(len(lst), 2)
-
- byid = objst.by_id('3e466e06-45de-4ecc-84ba-2d2a3d970e96')
-
- self.assertIn(byid, lst)
-
- r = byid
-
- self.assertEqual(r.uuid, '3e466e06-45de-4ecc-84ba-2d2a3d970e96')
- self.assertEqual(r['dc:author'], 'John-Mark Gurney')
-
- objst.store('testfile.pasn1')
-
- with open('testfile.pasn1') as fp:
- objs = _asn1coder.loads(fp.read())
-
- self.assertEqual(len(objs), len(objst))
-
- for i in objs:
- self.assertEqual(objst.by_id(i['uuid']), i)
|