|
-
- from . import bencode
- from functools import reduce
- from hashlib import sha1
- import importlib.resources
- import itertools
- import os
- import pathlib
- import shutil
- import sys
- import tempfile
- import unittest
-
- _encoding = 'utf-8'
-
- class Storage:
-
- def __init__(self, rootpath, files, piecelen):
- self._rootpath = pathlib.Path(rootpath)
- self._files = files
- self._piecelen = piecelen
-
- self._buildindex()
-
- def _filepaths(self):
- '''Iterates over all the files in the torrent.
-
- Each item is a tuple of:
- array of file path components (undecoded)
- a pathlib.PurePath for the file
- a pathlib.Path for file on disk
- '''
-
- for curfile in self._files:
- fname = pathlib.PurePath(
- *(x.decode(_encoding) for x in
- curfile['path']))
- curfilepath = self._rootpath / fname
-
- yield curfile, fname, curfilepath
-
- def allfiles(self):
- for x, y, curfilepath in self._filepaths():
- yield curfilepath
-
- def _buildindex(self):
- self._pieceindex = []
- self._fileindex = {}
- files = self._filepaths()
- left = 0
- curfile = None
-
- while True:
- if curfile is None or curfileoff == curfile['length']:
- # next file
- try:
- curfile, fname, curfilepath = next(files)
- except StopIteration:
- break
- curfileoff = 0
-
- if left == 0:
- current = []
- self._fileindex.setdefault(fname,
- []).append(len(self._pieceindex))
- self._pieceindex.append(current)
- left = self._piecelen
-
- sz = min(curfile['length'] - curfileoff, left)
-
- current.append(dict(file=curfilepath, fname=fname,
- offset=curfileoff, size=sz))
-
- curfileoff += sz
- left -= sz
-
- def filepieces(self):
- return self._fileindex.items()
-
- def filesforpiece(self, idx):
- for x in self._pieceindex[idx]:
- yield x['file']
-
- def apply_piece(self, idx, fun):
- for i in self._pieceindex[idx]:
- with open(i['file'], 'rb') as fp:
- fp.seek(i['offset'])
- fun(fp.read(i['size']))
-
- def validate(torrent, basedir):
- info = torrent['info']
-
- basedir = pathlib.Path(basedir)
-
- torrentdir = basedir / info['name'].decode(_encoding)
-
- stor = Storage(torrentdir, info['files'], info['piece length'])
-
- pieces = info['pieces']
- piecescnt = len(pieces) // 20
- valid = [ None ] * piecescnt
- for num, i in enumerate(pieces[x:x+20] for x in range(0, len(pieces),
- 20)):
- hash = sha1()
-
- stor.apply_piece(num, hash.update)
-
- if hash.digest() == i:
- valid[num] = True
- else:
- valid[num] = False
-
- # if any piece of a file is bad, it's bad
- allfiles = set(stor.allfiles())
-
- badfiles = { torrentdir / x for x, y in stor.filepieces() if
- not all(valid[i] for i in y) }
-
- return allfiles - badfiles, badfiles
-
- class _TestCases(unittest.TestCase):
- dirname = 'somedir'
-
- # file contents for somedir.torrent
- origfiledata = {
- 'filea.txt': b'foo\n',
- 'fileb.txt': b'bar\n',
- 'filec.txt': b'bleha\n',
- 'filed.txt': b'somehow\n',
- 'filee.txt': b'nowab\n',
- 'filef/filef.txt': b'\n',
- }
-
- # some munging to make some files bad
- badfiles = {
- 'filea.txt': b'',
- 'filec.txt': b'\x00\x00\x00\x00a\n',
- 'filee.txt': b'no',
- }
-
- def setUp(self):
- d = pathlib.Path(tempfile.mkdtemp()).resolve()
-
- tor = importlib.resources.files(__name__)
- tor = tor / 'fixtures' / 'somedir.torrent'
- with tor.open('rb') as fp:
- self.torrent = bencode.bdecode(fp.read())
-
- self.basetempdir = d
-
- self.oldcwd = os.getcwd()
-
- os.chdir(d)
-
- def tearDown(self):
- shutil.rmtree(self.basetempdir)
-
- os.chdir(self.oldcwd)
-
- @staticmethod
- def make_files(dname, fdict):
- dname = pathlib.Path(dname)
- for k, v in fdict.items():
- k = dname / pathlib.PurePosixPath(k)
- k.parent.mkdir(parents=True, exist_ok=True)
- with open(k, 'wb') as fp:
- fp.write(v)
-
- def test_completeverif(self):
- sd = self.basetempdir / self.dirname
- sd.mkdir()
-
- self.make_files(sd, self.origfiledata)
-
- validate(self.torrent, self.basetempdir)
-
- # encoded names
-
- sd = self.basetempdir / 'thai'
- sd.mkdir()
-
- self.make_files(sd, { 'thai - สวัสดี.txt': b'hello\n'
- })
-
- tor = importlib.resources.files(__name__)
- tor = tor / 'fixtures' / 'thai.torrent'
- with tor.open('rb') as fp:
- torrent = bencode.bdecode(fp.read())
-
- validate(torrent, self.basetempdir)
-
- def test_verification(self):
- # Testing for "missing" files
- # piece size 2 (aka 4 bytes)
- # empty file of 4 bytes 'foo\n'
- # complete file of 4 bytes 'bar\n'
- # partial missing file, 6 bytes, last two correct 'bleha\n'
- # complete file of 8 bytes (multiple pieces) 'somehow\n'
- # partial missing file, starting w/ 2 bytes, length 6 'nowab\n'
- # complete file (length 1) '\n'
-
- missingfiles = self.origfiledata.copy()
-
- missingfiles.update(self.badfiles)
-
- sd = self.basetempdir / self.dirname
- sd.mkdir()
-
- self.make_files(sd, missingfiles)
-
- val, inval = validate(self.torrent, self.basetempdir)
-
- self.assertEqual(set(val), { sd / x for x in
- missingfiles.keys() if x not in self.badfiles })
- self.assertEqual(set(inval), { sd / x for x in
- self.badfiles.keys() })
|