|
@@ -21,6 +21,11 @@ _escapes = '*?[]' |
|
|
def glob_escape(s): |
|
|
def glob_escape(s): |
|
|
return ''.join(x if x not in _escapes else '[%s]' % x for x in s) |
|
|
return ''.join(x if x not in _escapes else '[%s]' % x for x in s) |
|
|
|
|
|
|
|
|
|
|
|
def roundup(x, y): |
|
|
|
|
|
'''Round up x to the next multiple of y.''' |
|
|
|
|
|
|
|
|
|
|
|
return (x + y - 1) // y |
|
|
|
|
|
|
|
|
class Storage: |
|
|
class Storage: |
|
|
'''A class to help read pieces of a torrent. |
|
|
'''A class to help read pieces of a torrent. |
|
|
''' |
|
|
''' |
|
@@ -30,13 +35,21 @@ class Storage: |
|
|
rootpath - path to the dir of torrent files are in |
|
|
rootpath - path to the dir of torrent files are in |
|
|
files - the files dictionary from the torrent info key |
|
|
files - the files dictionary from the torrent info key |
|
|
piecelen - piece length from the torren info key |
|
|
piecelen - piece length from the torren info key |
|
|
|
|
|
|
|
|
|
|
|
If files is None, then rootpath points at the single file. |
|
|
''' |
|
|
''' |
|
|
|
|
|
|
|
|
self._rootpath = pathlib.Path(rootpath) |
|
|
self._rootpath = pathlib.Path(rootpath) |
|
|
self._files = files |
|
|
self._files = files |
|
|
self._piecelen = piecelen |
|
|
self._piecelen = piecelen |
|
|
|
|
|
|
|
|
self._buildindex() |
|
|
|
|
|
|
|
|
if files is None: |
|
|
|
|
|
# get length |
|
|
|
|
|
sz = self._rootpath.stat().st_size |
|
|
|
|
|
piececnt = roundup(sz, piecelen) |
|
|
|
|
|
self._pieceindex = [ [ dict(file=self._rootpath, offset=x * piecelen, size=piecelen if x < piececnt - 1 else sz - piececnt * x) ] for x in range(piececnt) ] |
|
|
|
|
|
else: |
|
|
|
|
|
self._buildindex() |
|
|
|
|
|
|
|
|
def _filepaths(self): |
|
|
def _filepaths(self): |
|
|
'''Iterates over all the files in the torrent. |
|
|
'''Iterates over all the files in the torrent. |
|
@@ -148,7 +161,8 @@ def validate(torrent, basedir): |
|
|
|
|
|
|
|
|
torrentdir = basedir / info['name'].decode(_encoding) |
|
|
torrentdir = basedir / info['name'].decode(_encoding) |
|
|
|
|
|
|
|
|
stor = Storage(torrentdir, info['files'], info['piece length']) |
|
|
|
|
|
|
|
|
files = info.get('files', None) |
|
|
|
|
|
stor = Storage(torrentdir, files, info['piece length']) |
|
|
|
|
|
|
|
|
pieces = info['pieces'] |
|
|
pieces = info['pieces'] |
|
|
piecescnt = len(pieces) // 20 |
|
|
piecescnt = len(pieces) // 20 |
|
@@ -164,6 +178,15 @@ def validate(torrent, basedir): |
|
|
else: |
|
|
else: |
|
|
valid[num] = False |
|
|
valid[num] = False |
|
|
|
|
|
|
|
|
|
|
|
if files is None: |
|
|
|
|
|
# single file |
|
|
|
|
|
f, e = set([ torrentdir ]), set() |
|
|
|
|
|
|
|
|
|
|
|
if not all(valid): |
|
|
|
|
|
f, e = e, f |
|
|
|
|
|
|
|
|
|
|
|
return f,e |
|
|
|
|
|
|
|
|
# if any piece of a file is bad, it's bad |
|
|
# if any piece of a file is bad, it's bad |
|
|
allfiles = set(stor.allfiles()) |
|
|
allfiles = set(stor.allfiles()) |
|
|
|
|
|
|
|
@@ -197,11 +220,13 @@ class _TestCases(unittest.TestCase): |
|
|
|
|
|
|
|
|
self.basetempdir = d |
|
|
self.basetempdir = d |
|
|
|
|
|
|
|
|
tor = importlib.resources.files(__name__) |
|
|
|
|
|
tor = tor / 'fixtures' / 'somedir.torrent' |
|
|
|
|
|
|
|
|
fixtures = importlib.resources.files(__name__) / 'fixtures' |
|
|
|
|
|
tor = fixtures / 'somedir.torrent' |
|
|
with tor.open('rb') as fp: |
|
|
with tor.open('rb') as fp: |
|
|
self.torrent = bencode.bdecode(fp.read()) |
|
|
self.torrent = bencode.bdecode(fp.read()) |
|
|
|
|
|
|
|
|
|
|
|
self.fixtures = fixtures |
|
|
|
|
|
|
|
|
self.oldcwd = os.getcwd() |
|
|
self.oldcwd = os.getcwd() |
|
|
|
|
|
|
|
|
os.chdir(d) |
|
|
os.chdir(d) |
|
@@ -258,6 +283,21 @@ class _TestCases(unittest.TestCase): |
|
|
]: |
|
|
]: |
|
|
self.assertTrue(fnmatch.fnmatch(i, glob_escape(i))) |
|
|
self.assertTrue(fnmatch.fnmatch(i, glob_escape(i))) |
|
|
|
|
|
|
|
|
|
|
|
def test_validate_file_single(self): |
|
|
|
|
|
sd = self.basetempdir / 'anotherdir' |
|
|
|
|
|
sd.mkdir() |
|
|
|
|
|
|
|
|
|
|
|
self.make_files(sd, self.origfiledata) |
|
|
|
|
|
|
|
|
|
|
|
shutil.copy(self.fixtures / 'filed.txt.torrent', self.basetempdir) |
|
|
|
|
|
|
|
|
|
|
|
tor = self.basetempdir / 'filed.txt.torrent' |
|
|
|
|
|
|
|
|
|
|
|
good, bad = validate_file(tor) |
|
|
|
|
|
|
|
|
|
|
|
self.assertFalse(bad) |
|
|
|
|
|
self.assertEqual(good, { sd / 'filed.txt' }) |
|
|
|
|
|
|
|
|
def test_verification(self): |
|
|
def test_verification(self): |
|
|
# Testing for "missing" files |
|
|
# Testing for "missing" files |
|
|
# piece size 2 (aka 4 bytes) |
|
|
# piece size 2 (aka 4 bytes) |
|
|