@@ -17,6 +17,7 @@ import os.path
import pasn1
import shutil
import string
import sys
import tempfile
import unittest
import uuid
@@ -119,7 +120,7 @@ class MDBase(object):
repr(ty))
def new_version(self, *args):
'''For each k, v par i, add the property k as an additional one
'''For each k, v pair , add the property k as an additional one
(or new one if first), with the value v.'''
obj = copy.deepcopy(self._obj)
@@ -452,7 +453,7 @@ class ObjectStore(object):
h = self.makehash(hash, strict=False)
return self._hashes[h]
def by_file(self, fname):
def by_file(self, fname, types=('metadata', ) ):
'''Return a metadata object for the file named fname.'''
fid = FileObject.make_id(fname)
@@ -463,11 +464,12 @@ class ObjectStore(object):
fobj = FileObject.from_file(fname, self._created_by_ref)
self.loadobj(fobj)
# XXX - does not verify
for i in fobj.hashes:
j = self.by_hash(i)
# Filter out non-metadata objects
j = [ x for x in j if x.type == 'metadata' ]
j = [ x for x in j if x.type in types ]
if j:
return j
else:
@@ -505,6 +507,7 @@ class FileObject(MDBase):
@classmethod
def from_file(cls, filename, created_by_ref):
s = os.stat(filename)
# XXX - add host uuid?
obj = {
'dir': os.path.dirname(filename),
'created_by_ref': created_by_ref,
@@ -525,41 +528,180 @@ def enumeratedir(_dir, created_by_ref):
return [FileObject.from_file(os.path.join(_dir, x), created_by_ref) for x in os.listdir(_dir)]
def get_objstore(options):
persona = get_persona(options)
storefname = os.path.expanduser('~/.medashare_store.pasn1')
try:
objstr = ObjectStore.load(storefname)
except FileNotFoundError:
objstr = ObjectStore(persona.get_identity().uuid)
return persona, objstr
def write_objstore(options, objstr):
storefname = os.path.expanduser('~/.medashare_store.pasn1')
objstr.store(storefname)
def get_persona(options):
identfname = os.path.expanduser('~/.medashare_identity.pasn1')
try:
persona = Persona.load(identfname)
except FileNotFoundError:
print('ERROR: Identity not created, create w/ -g.', file=sys.stderr)
sys.exit(1)
return persona
def cmd_genident(options):
identfname = os.path.expanduser('~/.medashare_identity.pasn1')
if os.path.exists(identfname):
print('Error: Identity already created.', file=sys.stderr)
sys.exit(1)
persona = Persona()
persona.generate_key()
persona.new_version(*(x.split('=', 1) for x in options.tagvalue))
persona.store(identfname)
def cmd_ident(options):
identfname = os.path.expanduser('~/.medashare_identity.pasn1')
persona = Persona.load(identfname)
persona.new_version(*(x.split('=', 1) for x in options.tagvalue))
persona.store(identfname)
def cmd_pubkey(options):
identfname = os.path.expanduser('~/.medashare_identity.pasn1')
persona = Persona.load(identfname)
print(persona.get_pubkey().decode('ascii'))
def cmd_modify(options):
persona, objstr = get_objstore(options)
props = [[ x[0] ] + x[1:].split('=', 1) for x in options.modtagvalues]
if any(x[0] not in ('+', '-') for x in props):
print('ERROR: tag needs to start w/ a "+" (add) or a "-" (remove).', file=sys.stderr)
sys.exit(1)
badtags = list(x[1] for x in props if x[1] in (MDBase._common_names | MDBase._common_optional))
if any(badtags):
print('ERROR: invalid tag(s): %s.' % repr(badtags), file=sys.stderr)
sys.exit(1)
adds = [ x[1:] for x in props if x[0] == '+' ]
if any((len(x) != 2 for x in adds)):
print('ERROR: invalid tag, needs an "=".', file=sys.stderr)
sys.exit(1)
dels = [ x[1:] for x in props if x[0] == '-' ]
for i in options.files:
try:
objs = objstr.by_file(i)
except KeyError:
fobj = objstr
objs = [ persona.by_file(i) ]
for j in objs:
# make into key/values
obj = j.__to_dict__()
# delete tags
for k in dels:
try:
key, v = k
except ValueError:
del obj[k[0]]
else:
obj[key].remove(v)
# add tags
for k, v in adds:
obj.setdefault(k, []).append(v)
del obj['modified']
nobj = MDBase.create_obj(obj)
objstr.loadobj(nobj)
write_objstore(options, objstr)
def cmd_list(options):
persona, objstr = get_objstore(options)
for i in options.files:
try:
for j in objstr.by_file(i):
#print >>sys.stderr, `j._obj`
for k, v in _iterdictlist(j):
print('%s:\t%s' % (k, v))
except (KeyError, FileNotFoundError):
print('ERROR: file not found: %s' % repr(i), file=sys.stderr)
sys.exit(1)
def main():
from optparse import OptionParser
import sys
parser = OptionParser()
parser.add_option('-a', action='append', dest='add',
default=[], help='add the arg as metadata for files, tag=value')
parser.add_option('-d', action='append', dest='delete',
default=[], help='delete the arg as metadata from files. Either specify tag, and all tags are removed, or specify tag=value and that specific tag will be removed.')
parser.add_option('-g', action='store_true', dest='generateident',
default=False, help='generate an identity')
parser.add_option('-i', action='store_true', dest='updateident',
default=False, help='update the identity')
parser.add_option('-l', action='store_true', dest='list',
default=False, help='list metadata')
parser.add_option('-p', action='store_true', dest='printpub',
default=False, help='Print the public key of the identity')
options, args = parser.parse_args()
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--db', '-d', type=str, help='base name for storage')
subparsers = parser.add_subparsers(title='subcommands',
description='valid subcommands', help='additional help')
parser_gi = subparsers.add_parser('genident', help='generate identity')
parser_gi.add_argument('tagvalue', nargs='+',
help='add the arg as metadata for the identity, tag=[value]')
parser_gi.set_defaults(func=cmd_genident)
parser_i = subparsers.add_parser('ident', help='update identity')
parser_i.add_argument('tagvalue', nargs='+',
help='add the arg as metadata for the identity, tag=[value]')
parser_i.set_defaults(func=cmd_ident)
parser_pubkey = subparsers.add_parser('pubkey', help='print public key of identity')
parser_pubkey.set_defaults(func=cmd_pubkey)
# used so that - isn't treated as an option
parser_mod = subparsers.add_parser('modify', help='modify tags on file(s)', prefix_chars='@')
parser_mod.add_argument('modtagvalues', nargs='+',
help='add (+) or delete (-) the tag=[value], for the specified files')
parser_mod.add_argument('files', nargs='+',
help='files to modify')
parser_mod.set_defaults(func=cmd_modify)
parser_list = subparsers.add_parser('list', help='list tags on file(s)')
parser_list.add_argument('files', nargs='+',
help='files to modify')
parser_list.set_defaults(func=cmd_list)
options = parser.parse_args()
fun = options.func
fun(options)
return
# this is shared between generateident and add
addprops = [x.split('=', 1) for x in options.add]
if options.generateident or options.updateident or options.printpub:
identfname = os.path.expanduser('~/.medashare_identity.pasn1')
if any((len(x) != 2 for x in addprops)):
print('ERROR: invalid tag, needs an "=".', file=sys.stderr)
sys.exit(1)
if options.generateident and os.path.exists(identfname):
print('Error: Identity already created.', file=sys.stderr)
sys.exit(1)
if options.updateident or options.printpub:
identfname = os.path.expanduser('~/.medashare_identity.pasn1')
if options.generateident:
persona = Persona()
persona.generate_key()
else:
persona = Persona.load(identfname)
persona = Persona.load(identfname)
if options.printpub:
print(persona.get_pubkey().decode('ascii'))
@@ -570,18 +712,17 @@ def main():
return
storefname = os.path.expanduser('~/.medashare_store.pasn1')
import sys
#print >>sys.stderr, `storefname`
identfname = os.path.expanduser('~/.medashare_identity.pasn1')
try:
objstr = ObjectStore.load(store fname)
persona = Persona.load(ident fname)
except FileNotFoundError:
identfname = os.path.expanduser('~/.medashare_identity.pasn1')
try:
persona = Persona.load(identfname)
except FileNotFoundError:
print('ERROR: Identity not created, create w/ -g.', file=sys.stderr)
sys.exit(1)
print('ERROR: Identity not created, create w/ -g.', file=sys.stderr)
sys.exit(1)
try:
objstr = ObjectStore.load(storefname)
except FileNotFoundError:
objstr = ObjectStore(persona.get_identity().uuid)
if options.list:
@@ -596,7 +737,13 @@ def main():
sys.exit(1)
elif options.add:
for i in args:
for j in objstr.by_file(i):
try:
objs = objstr.by_file(i)
except KeyError:
fobj = objstr
objs = [ persona.by_file(i) ]
for j in objs:
nobj = j.new_version(*addprops)
objstr.loadobj(nobj)
elif options.delete:
@@ -983,17 +1130,18 @@ class _TestCases(unittest.TestCase):
testfname = os.path.join(self.tempdir, 'test.txt')
newtestfname = os.path.join(self.tempdir, 'newfile.txt')
import sys
import io
import itertools
real_stderr = sys.stderr
with mock.patch('os.path.expanduser', side_effect=expandusermock) \
as eu, mock.patch('medashare.cli.open') as op:
# that when opening the store and identity fails
op.side_effect = FileNotFoundError
# and there is no identity
with mock.patch('sys.stderr', io.StringIO()) as stderr, mock.patch('sys.argv', [ 'progname', '- l', ]) as argv:
with mock.patch('sys.stderr', io.StringIO()) as stderr, mock.patch('sys.argv', [ 'progname', 'list ', 'afile' ]) as argv:
with self.assertRaises(SystemExit) as cm:
main()
@@ -1007,7 +1155,7 @@ class _TestCases(unittest.TestCase):
with mock.patch('os.path.expanduser', side_effect=expandusermock) \
as eu:
# that generating a new identity
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', '-g', '-a ', 'name=A Test User' ]) as argv:
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', 'genident ', 'name=A Test User' ]) as argv:
main()
# does not output anything
@@ -1024,7 +1172,7 @@ class _TestCases(unittest.TestCase):
self.assertEqual(pident.name, 'A Test User')
# that when generating an identity when one already exists
with mock.patch('sys.stderr', io.StringIO()) as stderr, mock.patch('sys.argv', [ 'progname', '-g', '-a ', 'name=A Test User' ]) as argv:
with mock.patch('sys.stderr', io.StringIO()) as stderr, mock.patch('sys.argv', [ 'progname', 'genident ', 'name=A Test User' ]) as argv:
# that it exits
with self.assertRaises(SystemExit) as cm:
main()
@@ -1040,7 +1188,7 @@ class _TestCases(unittest.TestCase):
eu.assert_called_with('~/.medashare_identity.pasn1')
# that when updating the identity
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', '-i', '-a ', 'name=Changed Name' ]) as argv:
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', 'ident ', 'name=Changed Name' ]) as argv:
main()
# it doesn't output anything
@@ -1065,7 +1213,7 @@ class _TestCases(unittest.TestCase):
self.assertTrue(persona.verify(nident))
# that when asked to print the public key
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', '- p' ]) as argv:
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', 'pubkey ' ]) as argv:
main()
# the correct key is printed
@@ -1076,7 +1224,7 @@ class _TestCases(unittest.TestCase):
eu.assert_called_with('~/.medashare_identity.pasn1')
# that when a new file is printed
with mock.patch('sys.stderr', io.StringIO()) as stderr, mock.patch('sys.argv', [ 'progname', '- l', newtestfname ]) as argv:
with mock.patch('sys.stderr', io.StringIO()) as stderr, mock.patch('sys.argv', [ 'progname', 'list ', newtestfname ]) as argv:
# that it exits
with self.assertRaises(SystemExit) as cm:
main()
@@ -1088,49 +1236,59 @@ class _TestCases(unittest.TestCase):
self.assertEqual(stderr.getvalue(),
'ERROR: file not found: %s\n' % repr(newtestfname))
# that when a tag is incomplete
with mock.patch('sys.stderr', io.StringIO()) as stderr, mock.patch('sys.argv', [ 'progname', 'modify', '+tag', newtestfname ]) as argv:
# that it exits
with self.assertRaises(SystemExit) as cm:
main()
# with error code 1
self.assertEqual(cm.exception.code, 1)
# and outputs an error message
self.assertEqual(stderr.getvalue(),
'ERROR: invalid tag, needs an "=".\n')
# that when a new file has a tag added
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', '-a', 'tag', newtestfname ]) as argv:
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', 'modify', '+tag= ', newtestfname ]) as argv:
main()
# nothing is printed
self.assertEqual(stdout.getvalue(), '');
eu.assert_called_with('~/.medashare_store.pasn1')
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', '-l', testfname ]) as argv:
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', 'list', testfname ]) as argv:
main()
self.assertEqual(stdout.getvalue(),
'dc:creator:\tJohn-Mark Gurney\nhashes:\tsha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada\nhashes:\tsha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f\nlang:\ten\n')
eu.assert_called_with('~/.medashare_store.pasn1')
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', '-a', 'dc:creator=Another user', '-a', ' foo=bar=baz', testfname ]) as argv:
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', 'modify', '+dc:creator=Another user', '+ foo=bar=baz', testfname ]) as argv:
main()
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', '- l', testfname ]) as argv:
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', 'list ', testfname ]) as argv:
main()
self.assertEqual(stdout.getvalue(),
'dc:creator:\tAnother user\ndc:creator:\tJohn-Mark Gurney\nfoo:\tbar=baz\nhashes:\tsha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada\nhashes:\tsha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f\nlang:\ten\n')
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', '-d', ' dc:creator', testfname ]) as argv:
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', 'modify', '- dc:creator', testfname ]) as argv:
main()
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', '- l', testfname ]) as argv:
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', 'list ', testfname ]) as argv:
main()
self.assertEqual(stdout.getvalue(),
'foo:\tbar=baz\nhashes:\tsha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada\nhashes:\tsha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f\nlang:\ten\n')
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', '-a', ' foo=bleh', testfname ]) as argv:
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', 'modify', '+ foo=bleh', testfname ]) as argv:
main()
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', '- l', testfname ]) as argv:
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', 'list ', testfname ]) as argv:
main()
self.assertEqual(stdout.getvalue(),
'foo:\tbar=baz\nfoo:\tbleh\nhashes:\tsha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada\nhashes:\tsha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f\nlang:\ten\n')
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', '-d', ' foo=bar=baz', testfname ]) as argv:
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', 'modify', '- foo=bar=baz', testfname ]) as argv:
main()
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', '- l', testfname ]) as argv:
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', 'list ', testfname ]) as argv:
main()
self.assertEqual(stdout.getvalue(),
'foo:\tbleh\nhashes:\tsha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada\nhashes:\tsha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f\nlang:\ten\n')
@@ -1140,6 +1298,7 @@ class _TestCases(unittest.TestCase):
as eu, mock.patch('medashare.cli.open') as op:
# that when the store fails
def open_repl(fname, mode):
#print('or:', repr(fname), repr(mode), file=sys.stderr)
self.assertIn(mode, ('rb', 'wb'))
if fname == identfname or mode == 'wb':
@@ -1151,8 +1310,14 @@ class _TestCases(unittest.TestCase):
op.side_effect = open_repl
# and there is no store
with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', '-l', ]) as argv:
main()
with mock.patch('sys.stderr', io.StringIO()) as stderr, mock.patch('sys.argv', [ 'progname', 'list', 'foo', ]) as argv:
# that it exits
with self.assertRaises(SystemExit) as cm:
main()
# does not output anything
self.assertEqual(stdout.getvalue(), '')
# with error code 1
self.assertEqual(cm.exception.code, 1)
# and outputs an error message
self.assertEqual(stderr.getvalue(),
'ERROR: file not found: \'foo\'\n')