#!/usr/bin/env python import datetime import hashlib import pasn1 import os.path import shutil import string import tempfile import unittest import uuid # The UUID for the namespace representing the path to a file _NAMESPACE_MEDASHARE_PATH = uuid.UUID('f6f36b62-3770-4a68-bc3d-dc3e31e429e6') _defaulthash = 'sha512' _validhashes = set([ 'sha256', 'sha512' ]) _hashlengths = { len(getattr(hashlib, x)().hexdigest()): x for x in _validhashes } # XXX - add validation class MDBase(object): '''This is a simple wrapper that turns a JSON object into a pythonesc object where attribute accesses work.''' _common_properties = [ 'uuid', 'type', 'modified', 'created_by_ref' ] def __init__(self, obj): for x in self._common_properties: if x not in obj: raise ValueError('common property %s not present' % `x`) self._obj = obj @classmethod def create_obj(cls, obj): '''Using obj as a base, create an instead of MDBase of the correct type. If the correct type is not found, a ValueError is raised.''' ty = obj['type'] for i in cls.__subclasses__(): if i._type == ty: return i(obj) else: raise ValueError('Unable to find class for type %s' % `ty`) def __getattr__(self, k): return self._obj[k] def __getitem__(self, k): return self._obj[k] def __to_dict__(self): return self._obj def __eq__(self, o): return cmp(self._obj, o) == 0 class MetaData(MDBase): _type = 'metadata' def _trytodict(o): try: return 'dict', o.__to_dict__() except Exception: raise TypeError('unable to find __to_dict__ on %s' % type(o)) _asn1coder = pasn1.ASN1DictCoder(coerce=_trytodict) class ObjectStore(object): '''A container to store for the various Metadata objects.''' def __init__(self): self._uuids = {} self._hashes = {} @staticmethod def makehash(hashstr, strict=True): '''Take a hash string, and return a valid hash string from it. This makes sure that it is of the correct type and length. If strict is False, the function will detect the length and return a valid hash if one can be found.''' try: hash, value = hashstr.split(':') except ValueError: if strict: raise hash = _hashlengths[len(hashstr)] value = hashstr if strict and len(str(value).translate(None, string.hexdigits.lower())) != 0: raise ValueError('value has invalid hex digits (must be lower case)', value) if hash in _validhashes: return ':'.join((hash, value)) raise ValueError def __len__(self): return len(self._uuids) def store(self, fname): '''Write out the objects in the store to the file named fname.''' with open(fname, 'w') as fp: fp.write(_asn1coder.dumps(self._uuids.values())) def loadobj(self, obj): '''Load obj into the data store.''' obj = MDBase.create_obj(obj) id = uuid.UUID(obj.uuid) self._uuids[id] = obj for j in obj.hashes: h = self.makehash(j) self._hashes.setdefault(h, []).append(obj) def load(self, fname): '''Load objects from the provided file name. Basic validation will be done on the objects in the file. The objects will be accessible via other methods.''' with open(fname) as fp: objs = _asn1coder.loads(fp.read()) for i in objs: self.loadobj(i) def by_id(self, id): '''Look up an object by it's UUID.''' uid = uuid.UUID(id) return self._uuids[uid] def by_hash(self, hash): '''Look up an object by it's hash value.''' h = self.makehash(hash, strict=False) return self._hashes[h] def _hashfile(fname): hash = getattr(hashlib, _defaulthash)() with open(fname) as fp: r = fp.read() hash.update(r) return '%s:%s' % (_defaulthash, hash.hexdigest()) class FileObject(object): def __init__(self, _dir, filename): self._dir = os.path.realpath(_dir) self._fname = filename # XXX make sure this is correct self._id = uuid.uuid5(_NAMESPACE_MEDASHARE_PATH, '/'.join(os.path.split(self._dir) + ( self._fname, ))) fname = os.path.join(_dir, filename) s = os.stat(fname) self._mtime = datetime.datetime.utcfromtimestamp(s.st_mtime) self._size = s.st_size self._hashes = ( _hashfile(fname), ) @property def hashes(self): '''The hashes for this file.''' # XXX - should return a frozen dict return self._hashes @property def mtime(self): '''The last modified date of the file.''' return self._mtime @property def size(self): '''The length of the file in bytes.''' return self._size @property def filename(self): '''The name of the file.''' return self._fname @property def dir(self): '''The directory of the file.''' return self._dir @property def id(self): '''The UUID of the path to this file.''' return self._id def enumeratedir(_dir='.'): '''Enumerate all the files and directories (not recursive) in _dir. Returned is a list of FileObjects.''' return map(lambda x: FileObject(_dir, x), os.listdir(_dir)) class _TestCases(unittest.TestCase): def setUp(self): d = os.path.realpath(tempfile.mkdtemp()) self.basetempdir = d self.tempdir = os.path.join(d, 'subdir') shutil.copytree(os.path.join('fixtures', 'testfiles'), self.tempdir) def tearDown(self): shutil.rmtree(self.basetempdir) self.tempdir = None def test_mdbase(self): self.assertRaises(ValueError, MDBase.create_obj, { 'type': 'unknosldkfj' }) self.assertRaises(ValueError, MDBase.create_obj, { 'type': 'metadata' }) def test_makehash(self): self.assertRaises(ValueError, ObjectStore.makehash, 'slkj') self.assertRaises(ValueError, ObjectStore.makehash, 'sha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ADA') self.assertEqual(ObjectStore.makehash('cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e', strict=False), 'sha512:cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e') self.assertEqual(ObjectStore.makehash('e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', strict=False), 'sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855') def test_enumeratedir(self): files = enumeratedir(self.tempdir) ftest = files[0] fname = 'test.txt' oldid = ftest.id self.assertEqual(ftest.filename, fname) self.assertEqual(ftest.dir, self.tempdir) # XXX - do we add host information? self.assertEqual(ftest.id, uuid.uuid5(_NAMESPACE_MEDASHARE_PATH, '/'.join(os.path.split(self.tempdir) + ( fname, )))) self.assertEqual(ftest.mtime, datetime.datetime(2019, 5, 20, 21, 47, 36)) self.assertEqual(ftest.size, 15) self.assertIn('sha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f', ftest.hashes) # XXX - make sure works w/ relative dirs files = enumeratedir(os.path.relpath(self.tempdir)) self.assertEqual(oldid, files[0].id) def test_objectstore(self): objst = ObjectStore() objst.load(os.path.join('fixtures', 'sample.data.pasn1')) objst.loadobj({ 'type': 'metadata', 'uuid': 'c9a1d1e2-3109-4efd-8948-577dc15e44e7', 'modified': datetime.datetime(2019, 5, 31, 14, 3, 10), 'created_by_ref': '867c7563-79ae-435c-a265-9d8509cefac5', 'hashes': [ 'sha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada' ], 'lang': 'en', }) lst = objst.by_hash('91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada') self.assertEqual(len(lst), 2) byid = objst.by_id('3e466e06-45de-4ecc-84ba-2d2a3d970e96') self.assertIsInstance(byid, MetaData) self.assertIn(byid, lst) r = byid self.assertEqual(r.uuid, '3e466e06-45de-4ecc-84ba-2d2a3d970e96') self.assertEqual(r['dc:author'], 'John-Mark Gurney') fname = 'testfile.pasn1' objst.store(fname) with open(fname) as fp: objs = _asn1coder.loads(fp.read()) os.unlink(fname) self.assertEqual(len(objs), len(objst)) for i in objs: self.assertEqual(objst.by_id(i['uuid']), i)