from . import bencode from functools import reduce from hashlib import sha1 import importlib.resources import itertools import os import pathlib import shutil import sys import tempfile import unittest class Storage: def __init__(self, rootpath, files, piecelen, encoding='us-ascii'): self._rootpath = pathlib.Path(rootpath) self._files = files self._piecelen = piecelen self._encoding = encoding self._buildindex() def _filepaths(self): for curfile in self._files: fname = pathlib.Path( *(x.decode(self._encoding) for x in curfile['path'])) curfilepath = self._rootpath / fname yield curfile, fname, curfilepath def allfiles(self): for x, y, curfilepath in self._filepaths(): yield curfilepath def _buildindex(self): self._index = [] files = self._filepaths() left = 0 curfile = None while True: if curfile is None or curfileoff == curfile['length']: # next file try: curfile, fname, curfilepath = next(files) except StopIteration: break curfileoff = 0 if left == 0: current = [] self._index.append(current) left = self._piecelen sz = min(curfile['length'] - curfileoff, left) current.append(dict(file=curfilepath, fname=fname, offset=curfileoff, size=sz)) curfileoff += sz left -= sz def filesforpiece(self, idx): for x in self._index[idx]: yield x['file'] def apply_piece(self, idx, fun): for i in self._index[idx]: with open(i['file'], 'rb') as fp: fp.seek(i['offset']) fun(fp.read(i['size'])) def validate(torrent, basedir): info = torrent['info'] basedir = pathlib.Path(basedir) try: encoding = torrent['encoding'].decode('us-ascii') except KeyError: encoding = 'us-ascii' torrentdir = basedir / info['name'].decode(encoding) stor = Storage(torrentdir, info['files'], info['piece length'], encoding) pieces = info['pieces'] piecescnt = len(pieces) // 20 valid = [ None ] * piecescnt for num, i in enumerate(pieces[x:x+20] for x in range(0, len(pieces), 20)): hash = sha1() stor.apply_piece(num, hash.update) if hash.digest() == i: valid[num] = True else: valid[num] = False # if any piece of a file is bad, it's bad allfiles = set(stor.allfiles()) badpieces = [ x for x, v in enumerate(valid) if not v ] badfiles = reduce(set.__or__, (set(stor.filesforpiece(x)) for x in badpieces), set()) return allfiles - badfiles, badfiles class _TestCases(unittest.TestCase): dirname = 'somedir' origfiledata = { 'filea.txt': b'foo\n', 'fileb.txt': b'bar\n', 'filec.txt': b'bleha\n', 'filed.txt': b'somehow\n', 'filee.txt': b'nowab\n', 'filef/filef.txt': b'\n', } def setUp(self): d = pathlib.Path(tempfile.mkdtemp()).resolve() tor = importlib.resources.files(__name__) tor = tor / 'fixtures' / 'somedir.torrent' with tor.open('rb') as fp: self.torrent = bencode.bdecode(fp.read()) self.basetempdir = d self.oldcwd = os.getcwd() os.chdir(d) def tearDown(self): shutil.rmtree(self.basetempdir) os.chdir(self.oldcwd) @staticmethod def make_files(dname, fdict): dname = pathlib.Path(dname) for k, v in fdict.items(): k = dname / pathlib.PurePosixPath(k) k.parent.mkdir(parents=True, exist_ok=True) with open(k, 'wb') as fp: fp.write(v) def test_completeverif(self): sd = self.basetempdir / self.dirname sd.mkdir() self.make_files(sd, self.origfiledata) validate(self.torrent, self.basetempdir) # encoded names sd = self.basetempdir / 'thai' sd.mkdir() self.make_files(sd, { 'thai - สวัสดี.txt': b'hello\n' }) tor = importlib.resources.files(__name__) tor = tor / 'fixtures' / 'thai.torrent' with tor.open('rb') as fp: torrent = bencode.bdecode(fp.read()) validate(torrent, self.basetempdir) def test_verification(self): # Testing for "missing" files # piece size 2 (aka 4 bytes) # empty file of 4 bytes 'foo\n' # complete file of 4 bytes 'bar\n' # partial missing file, 6 bytes, last two correct 'bleha\n' # complete file of 8 bytes (multiple pieces) 'somehow\n' # partial missing file, starting w/ 2 bytes, length 6 'nowab\n' # complete file (length 1) '\n' missingfiles = self.origfiledata.copy() badfiles = { 'filea.txt': b'', 'filec.txt': b'\x00\x00\x00\x00a\n', 'filee.txt': b'no', } missingfiles.update(badfiles) sd = self.basetempdir / self.dirname sd.mkdir() self.make_files(sd, missingfiles) val, inval = validate(self.torrent, self.basetempdir) self.assertEqual(set(val), { sd / x for x in missingfiles.keys() if x not in badfiles }) self.assertEqual(set(inval), { sd / x for x in badfiles.keys() })