diff --git a/ui/cli.py b/ui/cli.py new file mode 100644 index 0000000..0e1c50d --- /dev/null +++ b/ui/cli.py @@ -0,0 +1,217 @@ +#!/usr/bin/env python + +import hashlib +import pasn1 +import os.path +import shutil +import string +import tempfile +import unittest +import uuid + +_validhashes = set([ 'sha256', 'sha512' ]) +_hashlengths = { len(getattr(hashlib, x)().hexdigest()): x for x in _validhashes } + +# XXX - add validation +class ObjWrap(object): + '''This is a simple wrapper that turns a JSON object into a pythonesc + object where attribute accesses work.''' + + def __init__(self, obj): + self._obj = obj + + def __getattr__(self, k): + return self._obj[k] + + def __getitem__(self, k): + return self._obj[k] + + def __to_dict__(self): + return self._obj + + def __eq__(self, o): + return cmp(self._obj, o) == 0 + +def _trytodict(o): + try: + return 'dict', o.__to_dict__() + except Exception: + raise TypeError('unable to find __to_dict__ on %s' % type(o)) + +_asn1coder = pasn1.ASN1DictCoder(coerce=_trytodict) + +class ObjectStore(object): + '''A container to store for the various Metadata objects.''' + + def __init__(self): + self._uuids = {} + self._hashes = {} + + @staticmethod + def makehash(hashstr, strict=True): + '''Take a hash string, and return a valid hash string from it. + + This makes sure that it is of the correct type and length. + + If strict is False, the function will detect the length and + return a valid hash if one can be found.''' + + try: + hash, value = hashstr.split(':') + except ValueError: + if strict: + raise + + hash = _hashlengths[len(hashstr)] + value = hashstr + + if strict and len(str(value).translate(None, string.hexdigits.lower())) != 0: + raise ValueError('value has invalid hex digits (must be lower case)', value) + + if hash in _validhashes: + return ':'.join((hash, value)) + + raise ValueError + + def __len__(self): + return len(self._uuids) + + def store(self, fname): + '''Write out the objects in the store to the file named + fname.''' + + with open(fname, 'w') as fp: + fp.write(_asn1coder.dumps(self._uuids.values())) + + def loadobj(self, obj): + '''Load obj into the data store.''' + + if not isinstance(obj, ObjWrap): + obj = ObjWrap(obj) + + id = uuid.UUID(obj.uuid) + self._uuids[id] = obj + for j in obj.hashes: + h = self.makehash(j) + self._hashes.setdefault(h, []).append(obj) + + def load(self, fname): + '''Load objects from the provided file name. + + Basic validation will be done on the objects in the file. + + The objects will be accessible via other methods.''' + + with open(fname) as fp: + objs = _asn1coder.loads(fp.read()) + + for i in objs: + self.loadobj(i) + + def by_id(self, id): + '''Look up an object by it's UUID.''' + + uid = uuid.UUID(id) + return self._uuids[uid] + + def by_hash(self, hash): + '''Look up an object by it's hash value.''' + + h = self.makehash(hash, strict=False) + return self._hashes[h] + +class FileObject(object): + def __init__(self, _dir, filename): + self._dir = _dir + self._fname = filename + + @property + def filename(self): + '''The name of the file.''' + + return self._fname + + @property + def dir(self): + '''The directory of the file.''' + + return self._dir + + @property + def id(self): + '''The UUID of the path to this file.''' + + # XXX make sure this is correct + return uuid.uuid5(uuid.NAMESPACE_URL, 'someurl' + '/'.join(os.path.split(self._dir) + ( self._fname, ))) + +def enumeratedir(_dir): + '''Enumerate all the files and directories (not recursive) in _dir. + + Returned is a list of FileObjects.''' + + return map(lambda x: FileObject(_dir, x), os.listdir(_dir)) + +class _TestCases(unittest.TestCase): + def setUp(self): + d = tempfile.mkdtemp() + self.basetempdir = d + self.tempdir = os.path.join(d, 'subdir') + + shutil.copytree(os.path.join('fixtures', 'testfiles'), + self.tempdir) + + def tearDown(self): + shutil.rmtree(self.basetempdir) + self.tempdir = None + + def test_makehash(self): + self.assertRaises(ValueError, ObjectStore.makehash, 'slkj') + self.assertRaises(ValueError, ObjectStore.makehash, 'sha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ADA') + + self.assertEqual(ObjectStore.makehash('cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e', strict=False), 'sha512:cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e') + self.assertEqual(ObjectStore.makehash('e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', strict=False), 'sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855') + + def test_enumeratedir(self): + files = enumeratedir(self.tempdir) + ftest = files[0] + fname = 'test.txt' + + self.assertEqual(ftest.filename, fname) + self.assertEqual(ftest.dir, self.tempdir) + self.assertEqual(ftest.id, uuid.uuid5(uuid.NAMESPACE_URL, + 'someurl' + '/'.join(os.path.split(self.tempdir) + + ( fname, )))) + + def test_objectstore(self): + objst = ObjectStore() + + objst.load(os.path.join('fixtures', 'sample.data.pasn1')) + + objst.loadobj({ + 'type': 'metadata', + 'uuid': 'c9a1d1e2-3109-4efd-8948-577dc15e44e7', + 'hashes': [ 'sha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada' ], + 'lang': 'en', + }) + + lst = objst.by_hash('91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada') + self.assertEqual(len(lst), 2) + + byid = objst.by_id('3e466e06-45de-4ecc-84ba-2d2a3d970e96') + + self.assertIn(byid, lst) + + r = byid + + self.assertEqual(r.uuid, '3e466e06-45de-4ecc-84ba-2d2a3d970e96') + self.assertEqual(r['dc:author'], 'John-Mark Gurney') + + objst.store('testfile.pasn1') + + with open('testfile.pasn1') as fp: + objs = _asn1coder.loads(fp.read()) + + self.assertEqual(len(objs), len(objst)) + + for i in objs: + self.assertEqual(objst.by_id(i['uuid']), i)