- from . import bencode
- from hashlib import sha1
- import importlib.resources
- import itertools
- import os
- import pathlib
- import shutil
- import sys
- import tempfile
- import unittest
- class Storage:
- def __init__(self, rootpath, files, piecelen):
- self._rootpath = pathlib.Path(rootpath)
- self._files = files
- self._piecelen = piecelen
- self._buildindex()
- def _buildindex(self):
- self._index = []
- files = iter(self._files)
- left = 0
- curfile = None
- while True:
- if curfile is None or curfileoff == curfile['length']:
- # next file
- try:
- curfile = next(files)
- fname = pathlib.Path(
- *(x.decode('us-ascii') for x in
- curfile['path']))
- curfilepath = self._rootpath / fname
- except StopIteration:
- break
- curfileoff = 0
- if left == 0:
- current = []
- self._index.append(current)
- left = self._piecelen
- sz = min(curfile['length'] - curfileoff, left)
- current.append(dict(file=curfilepath, fname=fname,
- offset=curfileoff, size=sz))
- curfileoff += sz
- left -= sz
- def apply_piece(self, idx, fun):
- for i in self._index[idx]:
- with open(i['file'], 'rb') as fp:
- fp.seek(i['offset'])
- fun(fp.read(i['size']))
- def validate(torrent, basedir):
- info = torrent['info']
- basedir = pathlib.Path(basedir)
- print(repr(torrent))
- torrentdir = basedir / info['name'].decode('us-ascii')
- stor = Storage(torrentdir, info['files'], info['piece length'])
- pieces = info['pieces']
- for num, i in enumerate(pieces[x:x+20] for x in range(0, len(pieces),
- 20)):
- hash = sha1()
- stor.apply_piece(num, hash.update)
- if hash.digest() != i:
- raise ValueError
- class _TestCases(unittest.TestCase):
- dirname = 'somedir'
- origfiledata = {
- 'filea.txt': b'foo\n',
- 'fileb.txt': b'bar\n',
- 'filec.txt': b'bleha\n',
- 'filed.txt': b'somehow\n',
- 'filee.txt': b'nowab\n',
- 'filef/filef.txt': b'\n',
- }
- def setUp(self):
- d = pathlib.Path(tempfile.mkdtemp()).resolve()
- tor = importlib.resources.files(__name__)
- tor = tor / 'fixtures' / 'somedir.torrent'
- with tor.open('rb') as fp:
- self.torrent = bencode.bdecode(fp.read())
- self.basetempdir = d
- self.oldcwd = os.getcwd()
- os.chdir(d)
- def tearDown(self):
- shutil.rmtree(self.basetempdir)
- os.chdir(self.oldcwd)
- @staticmethod
- def make_files(dname, fdict):
- dname = pathlib.Path(dname)
- for k, v in fdict.items():
- k = dname / pathlib.PurePosixPath(k)
- k.parent.mkdir(parents=True, exist_ok=True)
- with open(k, 'wb') as fp:
- fp.write(v)
- def test_completeverif(self):
- sd = self.basetempdir / self.dirname
- sd.mkdir()
- self.make_files(sd, self.origfiledata)
- validate(self.torrent, self.basetempdir)
- def test_verification(self):
- # Testing for "missing" files
- # piece size 2 (aka 4 bytes)
- # empty file of 4 bytes 'foo\n'
- # complete file of 4 bytes 'bar\n'
- # partial missing file, 6 bytes, last two correct 'bleha\n'
- # complete file of 8 bytes (multiple pieces) 'somehow\n'
- # partial missing file, starting w/ 2 bytes, length 6 'nowab\n'
- # complete file (length 1) '\n'
- missingfiles = self.origfiledata.copy()
- missingfiles['filea.txt'] = b''
- missingfiles['filec.txt'] = b'\x00\x00\x00\x00a\n'
- missingfiles['filee.txt'] = b'no'
- sd = self.basetempdir / self.dirname
- sd.mkdir()
- self.make_files(sd, missingfiles)
- self.assertRaises(ValueError, validate, self.torrent, self.basetempdir)