from . import bencode from functools import reduce from hashlib import sha1 import importlib.resources import itertools import os import pathlib import shutil import sys import tempfile import unittest _encoding = 'utf-8' class Storage: def __init__(self, rootpath, files, piecelen): self._rootpath = pathlib.Path(rootpath) self._files = files self._piecelen = piecelen self._buildindex() def _filepaths(self): '''Iterates over all the files in the torrent. Each item is a tuple of: array of file path components (undecoded) a pathlib.PurePath for the file a pathlib.Path for file on disk ''' for curfile in self._files: fname = pathlib.PurePath( *(x.decode(_encoding) for x in curfile['path'])) curfilepath = self._rootpath / fname yield curfile, fname, curfilepath def allfiles(self): for x, y, curfilepath in self._filepaths(): yield curfilepath def _buildindex(self): self._pieceindex = [] self._fileindex = {} files = self._filepaths() left = 0 curfile = None while True: if curfile is None or curfileoff == curfile['length']: # next file try: curfile, fname, curfilepath = next(files) except StopIteration: break curfileoff = 0 if left == 0: current = [] self._fileindex.setdefault(fname, []).append(len(self._pieceindex)) self._pieceindex.append(current) left = self._piecelen sz = min(curfile['length'] - curfileoff, left) current.append(dict(file=curfilepath, fname=fname, offset=curfileoff, size=sz)) curfileoff += sz left -= sz def filepieces(self): return self._fileindex.items() def filesforpiece(self, idx): for x in self._pieceindex[idx]: yield x['file'] def apply_piece(self, idx, fun): for i in self._pieceindex[idx]: with open(i['file'], 'rb') as fp: fp.seek(i['offset']) fun(fp.read(i['size'])) def validate(torrent, basedir): info = torrent['info'] basedir = pathlib.Path(basedir) torrentdir = basedir / info['name'].decode(_encoding) stor = Storage(torrentdir, info['files'], info['piece length']) pieces = info['pieces'] piecescnt = len(pieces) // 20 valid = [ None ] * piecescnt for num, i in enumerate(pieces[x:x+20] for x in range(0, len(pieces), 20)): hash = sha1() stor.apply_piece(num, hash.update) if hash.digest() == i: valid[num] = True else: valid[num] = False # if any piece of a file is bad, it's bad allfiles = set(stor.allfiles()) badfiles = { torrentdir / x for x, y in stor.filepieces() if not all(valid[i] for i in y) } return allfiles - badfiles, badfiles class _TestCases(unittest.TestCase): dirname = 'somedir' # file contents for somedir.torrent origfiledata = { 'filea.txt': b'foo\n', 'fileb.txt': b'bar\n', 'filec.txt': b'bleha\n', 'filed.txt': b'somehow\n', 'filee.txt': b'nowab\n', 'filef/filef.txt': b'\n', } # some munging to make some files bad badfiles = { 'filea.txt': b'', 'filec.txt': b'\x00\x00\x00\x00a\n', 'filee.txt': b'no', } def setUp(self): d = pathlib.Path(tempfile.mkdtemp()).resolve() tor = importlib.resources.files(__name__) tor = tor / 'fixtures' / 'somedir.torrent' with tor.open('rb') as fp: self.torrent = bencode.bdecode(fp.read()) self.basetempdir = d self.oldcwd = os.getcwd() os.chdir(d) def tearDown(self): shutil.rmtree(self.basetempdir) os.chdir(self.oldcwd) @staticmethod def make_files(dname, fdict): dname = pathlib.Path(dname) for k, v in fdict.items(): k = dname / pathlib.PurePosixPath(k) k.parent.mkdir(parents=True, exist_ok=True) with open(k, 'wb') as fp: fp.write(v) def test_completeverif(self): sd = self.basetempdir / self.dirname sd.mkdir() self.make_files(sd, self.origfiledata) validate(self.torrent, self.basetempdir) # that utf-8 encoded names work sd = self.basetempdir / 'thai' sd.mkdir() self.make_files(sd, { 'thai - สวัสดี.txt': b'hello\n' }) tor = importlib.resources.files(__name__) tor = tor / 'fixtures' / 'thai.torrent' with tor.open('rb') as fp: torrent = bencode.bdecode(fp.read()) validate(torrent, self.basetempdir) def test_verification(self): # Testing for "missing" files # piece size 2 (aka 4 bytes) # empty file of 4 bytes 'foo\n' # complete file of 4 bytes 'bar\n' # partial missing file, 6 bytes, last two correct 'bleha\n' # complete file of 8 bytes (multiple pieces) 'somehow\n' # partial missing file, starting w/ 2 bytes, length 6 'nowab\n' # complete file (length 1) '\n' missingfiles = self.origfiledata.copy() missingfiles.update(self.badfiles) sd = self.basetempdir / self.dirname sd.mkdir() self.make_files(sd, missingfiles) val, inval = validate(self.torrent, self.basetempdir) self.assertEqual(set(val), { sd / x for x in missingfiles.keys() if x not in self.badfiles }) self.assertEqual(set(inval), { sd / x for x in self.badfiles.keys() })